This commit is contained in:
@@ -3,7 +3,7 @@ from enum import Enum
|
||||
from typing import Any, Dict, Set
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.system.software import IOSoftware
|
||||
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
@@ -92,6 +92,9 @@ class Application(IOSoftware):
|
||||
if self.operating_state == ApplicationOperatingState.CLOSED:
|
||||
self.sys_log.info(f"Running Application {self.name}")
|
||||
self.operating_state = ApplicationOperatingState.RUNNING
|
||||
# set software health state to GOOD if initially set to UNUSED
|
||||
if self.health_state_actual == SoftwareHealthState.UNUSED:
|
||||
self.set_health_state(SoftwareHealthState.GOOD)
|
||||
|
||||
def _application_loop(self):
|
||||
"""The main application loop."""
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import shutil
|
||||
from abc import ABC
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Optional
|
||||
from typing import Dict, Optional
|
||||
|
||||
from primaite.simulator.file_system.file_system import File
|
||||
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
|
||||
@@ -16,6 +16,10 @@ class FTPServiceABC(Service, ABC):
|
||||
Contains shared methods between both classes.
|
||||
"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""Returns a Dict of the FTPService state."""
|
||||
return super().describe_state()
|
||||
|
||||
def _process_ftp_command(self, payload: FTPPacket, session_id: Optional[str] = None, **kwargs) -> FTPPacket:
|
||||
"""
|
||||
Process the command in the FTP Packet.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
@@ -95,6 +96,7 @@ class Service(IOSoftware):
|
||||
rm.add_request("enable", RequestType(func=lambda request, context: self.enable()))
|
||||
return rm
|
||||
|
||||
@abstractmethod
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
@@ -282,7 +282,7 @@ class IOSoftware(Software):
|
||||
|
||||
Returns true if the software can perform actions.
|
||||
"""
|
||||
if self.software_manager and self.software_manager.node.operating_state == NodeOperatingState.OFF:
|
||||
if self.software_manager and self.software_manager.node.operating_state != NodeOperatingState.ON:
|
||||
_LOGGER.debug(f"{self.name} Error: {self.software_manager.node.hostname} is not online.")
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -40,6 +40,9 @@ from primaite.simulator.network.hardware.base import Link, Node
|
||||
class TestService(Service):
|
||||
"""Test Service class"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return super().describe_state()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "TestService"
|
||||
kwargs["port"] = Port.HTTP
|
||||
@@ -60,7 +63,7 @@ class TestApplication(Application):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
pass
|
||||
return super().describe_state()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
||||
@@ -24,8 +24,8 @@ def populated_node(application_class) -> Tuple[Application, Computer]:
|
||||
return app, computer
|
||||
|
||||
|
||||
def test_service_on_offline_node(application_class):
|
||||
"""Test to check that the service cannot be interacted with when node it is on is off."""
|
||||
def test_application_on_offline_node(application_class):
|
||||
"""Test to check that the application cannot be interacted with when node it is on is off."""
|
||||
computer: Computer = Computer(
|
||||
hostname="test_computer",
|
||||
ip_address="192.168.1.2",
|
||||
@@ -49,8 +49,8 @@ def test_service_on_offline_node(application_class):
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_server_turns_off_service(populated_node):
|
||||
"""Check that the service is turned off when the server is turned off"""
|
||||
def test_server_turns_off_application(populated_node):
|
||||
"""Check that the application is turned off when the server is turned off"""
|
||||
app, computer = populated_node
|
||||
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
@@ -65,8 +65,8 @@ def test_server_turns_off_service(populated_node):
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_service_cannot_be_turned_on_when_server_is_off(populated_node):
|
||||
"""Check that the service cannot be started when the server is off."""
|
||||
def test_application_cannot_be_turned_on_when_computer_is_off(populated_node):
|
||||
"""Check that the application cannot be started when the computer is off."""
|
||||
app, computer = populated_node
|
||||
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
@@ -86,8 +86,8 @@ def test_service_cannot_be_turned_on_when_server_is_off(populated_node):
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_server_turns_on_service(populated_node):
|
||||
"""Check that turning on the server turns on service."""
|
||||
def test_computer_runs_applications(populated_node):
|
||||
"""Check that turning on the computer will turn on applications."""
|
||||
app, computer = populated_node
|
||||
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
@@ -109,13 +109,14 @@ def test_server_turns_on_service(populated_node):
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
assert app.operating_state is ApplicationOperatingState.RUNNING
|
||||
|
||||
computer.start_up_duration = 0
|
||||
computer.shut_down_duration = 0
|
||||
|
||||
computer.power_off()
|
||||
for i in range(computer.start_up_duration + 1):
|
||||
computer.apply_timestep(timestep=i)
|
||||
assert computer.operating_state is NodeOperatingState.OFF
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
computer.power_on()
|
||||
for i in range(computer.start_up_duration + 1):
|
||||
computer.apply_timestep(timestep=i)
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
assert app.operating_state is ApplicationOperatingState.RUNNING
|
||||
|
||||
@@ -117,13 +117,14 @@ def test_server_turns_on_service(populated_node):
|
||||
assert server.operating_state is NodeOperatingState.ON
|
||||
assert service.operating_state is ServiceOperatingState.RUNNING
|
||||
|
||||
server.start_up_duration = 0
|
||||
server.shut_down_duration = 0
|
||||
|
||||
server.power_off()
|
||||
for i in range(server.start_up_duration + 1):
|
||||
server.apply_timestep(timestep=i)
|
||||
assert server.operating_state is NodeOperatingState.OFF
|
||||
assert service.operating_state is ServiceOperatingState.STOPPED
|
||||
|
||||
server.power_on()
|
||||
for i in range(server.start_up_duration + 1):
|
||||
server.apply_timestep(timestep=i)
|
||||
assert server.operating_state is NodeOperatingState.ON
|
||||
assert service.operating_state is ServiceOperatingState.RUNNING
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
|
||||
def test_scan(application):
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
application.run()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
application.scan()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_visible == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_run_application(application):
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
application.run()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_close_application(application):
|
||||
application.run()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
application.close()
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_application_describe_states(application):
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
assert SoftwareHealthState.UNUSED.value == application.describe_state().get("health_state_actual")
|
||||
|
||||
application.run()
|
||||
assert SoftwareHealthState.GOOD.value == application.describe_state().get("health_state_actual")
|
||||
|
||||
application.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
assert SoftwareHealthState.COMPROMISED.value == application.describe_state().get("health_state_actual")
|
||||
|
||||
application.patch()
|
||||
assert SoftwareHealthState.PATCHING.value == application.describe_state().get("health_state_actual")
|
||||
Reference in New Issue
Block a user