Merged PR 254: #2151: remove changing of health_state_actual in actions and tests
## Summary Removed the changing of health_state_actual from the actions - only few places where the health_state_actual is changed: - compromised in specific scripted events - set to good when started (if the state is still at UNUSED - set to good after patching ## Test process unit tests https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/254?_a=files&path=/tests/unit_tests/_primaite/_simulator/_system/_services/test_services.py ## Checklist - [X] PR is linked to a **work item** - [X] **acceptance criteria** of linked ticket are met - [X] performed **self-review** of the code - [X] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [ ] updated the **change log** - [X] ran **pre-commit** checks for code style - [ ] attended to any **TO-DOs** left in the code #2151: remove changing of health_state_actual in actions and tests Related work items: #2151, #2166
This commit is contained in:
@@ -38,9 +38,6 @@ class Application(IOSoftware):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.health_state_visible = SoftwareHealthState.UNUSED
|
||||
self.health_state_actual = SoftwareHealthState.UNUSED
|
||||
|
||||
def set_original_state(self):
|
||||
"""Sets the original state."""
|
||||
super().set_original_state()
|
||||
@@ -95,6 +92,9 @@ class Application(IOSoftware):
|
||||
if self.operating_state == ApplicationOperatingState.CLOSED:
|
||||
self.sys_log.info(f"Running Application {self.name}")
|
||||
self.operating_state = ApplicationOperatingState.RUNNING
|
||||
# set software health state to GOOD if initially set to UNUSED
|
||||
if self.health_state_actual == SoftwareHealthState.UNUSED:
|
||||
self.set_health_state(SoftwareHealthState.GOOD)
|
||||
|
||||
def _application_loop(self):
|
||||
"""The main application loop."""
|
||||
|
||||
@@ -196,7 +196,7 @@ class DatabaseService(Service):
|
||||
return {"status_code": 404, "data": False}
|
||||
elif query == "DELETE":
|
||||
if self.health_state_actual == SoftwareHealthState.GOOD:
|
||||
self.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
self.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
return {
|
||||
"status_code": 200,
|
||||
"type": "sql",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import shutil
|
||||
from abc import ABC
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Optional
|
||||
from typing import Dict, Optional
|
||||
|
||||
from primaite.simulator.file_system.file_system import File
|
||||
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
|
||||
@@ -16,6 +16,10 @@ class FTPServiceABC(Service, ABC):
|
||||
Contains shared methods between both classes.
|
||||
"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""Returns a Dict of the FTPService state."""
|
||||
return super().describe_state()
|
||||
|
||||
def _process_ftp_command(self, payload: FTPPacket, session_id: Optional[str] = None, **kwargs) -> FTPPacket:
|
||||
"""
|
||||
Process the command in the FTP Packet.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
@@ -43,9 +44,6 @@ class Service(IOSoftware):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.health_state_visible = SoftwareHealthState.UNUSED
|
||||
self.health_state_actual = SoftwareHealthState.UNUSED
|
||||
|
||||
def _can_perform_action(self) -> bool:
|
||||
"""
|
||||
Checks if the service can perform actions.
|
||||
@@ -98,6 +96,7 @@ class Service(IOSoftware):
|
||||
rm.add_request("enable", RequestType(func=lambda request, context: self.enable()))
|
||||
return rm
|
||||
|
||||
@abstractmethod
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -118,7 +117,6 @@ class Service(IOSoftware):
|
||||
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
|
||||
self.sys_log.info(f"Stopping service {self.name}")
|
||||
self.operating_state = ServiceOperatingState.STOPPED
|
||||
self.health_state_actual = SoftwareHealthState.UNUSED
|
||||
|
||||
def start(self, **kwargs) -> None:
|
||||
"""Start the service."""
|
||||
@@ -129,42 +127,39 @@ class Service(IOSoftware):
|
||||
if self.operating_state == ServiceOperatingState.STOPPED:
|
||||
self.sys_log.info(f"Starting service {self.name}")
|
||||
self.operating_state = ServiceOperatingState.RUNNING
|
||||
self.health_state_actual = SoftwareHealthState.GOOD
|
||||
# set software health state to GOOD if initially set to UNUSED
|
||||
if self.health_state_actual == SoftwareHealthState.UNUSED:
|
||||
self.set_health_state(SoftwareHealthState.GOOD)
|
||||
|
||||
def pause(self) -> None:
|
||||
"""Pause the service."""
|
||||
if self.operating_state == ServiceOperatingState.RUNNING:
|
||||
self.sys_log.info(f"Pausing service {self.name}")
|
||||
self.operating_state = ServiceOperatingState.PAUSED
|
||||
self.health_state_actual = SoftwareHealthState.OVERWHELMED
|
||||
|
||||
def resume(self) -> None:
|
||||
"""Resume paused service."""
|
||||
if self.operating_state == ServiceOperatingState.PAUSED:
|
||||
self.sys_log.info(f"Resuming service {self.name}")
|
||||
self.operating_state = ServiceOperatingState.RUNNING
|
||||
self.health_state_actual = SoftwareHealthState.GOOD
|
||||
|
||||
def restart(self) -> None:
|
||||
"""Restart running service."""
|
||||
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
|
||||
self.sys_log.info(f"Pausing service {self.name}")
|
||||
self.operating_state = ServiceOperatingState.RESTARTING
|
||||
self.health_state_actual = SoftwareHealthState.OVERWHELMED
|
||||
self.restart_countdown = self.restart_duration
|
||||
|
||||
def disable(self) -> None:
|
||||
"""Disable the service."""
|
||||
self.sys_log.info(f"Disabling Application {self.name}")
|
||||
self.operating_state = ServiceOperatingState.DISABLED
|
||||
self.health_state_actual = SoftwareHealthState.OVERWHELMED
|
||||
|
||||
def enable(self) -> None:
|
||||
"""Enable the disabled service."""
|
||||
if self.operating_state == ServiceOperatingState.DISABLED:
|
||||
self.sys_log.info(f"Enabling Application {self.name}")
|
||||
self.operating_state = ServiceOperatingState.STOPPED
|
||||
self.health_state_actual = SoftwareHealthState.OVERWHELMED
|
||||
|
||||
def apply_timestep(self, timestep: int) -> None:
|
||||
"""
|
||||
@@ -181,5 +176,4 @@ class Service(IOSoftware):
|
||||
if self.restart_countdown <= 0:
|
||||
_LOGGER.debug(f"Restarting finished for service {self.name}")
|
||||
self.operating_state = ServiceOperatingState.RUNNING
|
||||
self.health_state_actual = SoftwareHealthState.GOOD
|
||||
self.restart_countdown -= 1
|
||||
|
||||
@@ -71,9 +71,9 @@ class Software(SimComponent):
|
||||
|
||||
name: str
|
||||
"The name of the software."
|
||||
health_state_actual: SoftwareHealthState = SoftwareHealthState.GOOD
|
||||
health_state_actual: SoftwareHealthState = SoftwareHealthState.UNUSED
|
||||
"The actual health state of the software."
|
||||
health_state_visible: SoftwareHealthState = SoftwareHealthState.GOOD
|
||||
health_state_visible: SoftwareHealthState = SoftwareHealthState.UNUSED
|
||||
"The health state of the software visible to the red agent."
|
||||
criticality: SoftwareCriticality = SoftwareCriticality.LOWEST
|
||||
"The criticality level of the software."
|
||||
@@ -282,7 +282,7 @@ class IOSoftware(Software):
|
||||
|
||||
Returns true if the software can perform actions.
|
||||
"""
|
||||
if self.software_manager and self.software_manager.node.operating_state is NodeOperatingState.OFF:
|
||||
if self.software_manager and self.software_manager.node.operating_state != NodeOperatingState.ON:
|
||||
_LOGGER.debug(f"{self.name} Error: {self.software_manager.node.hostname} is not online.")
|
||||
return False
|
||||
return True
|
||||
@@ -303,13 +303,13 @@ class IOSoftware(Software):
|
||||
"""
|
||||
# if over or at capacity, set to overwhelmed
|
||||
if len(self._connections) >= self.max_sessions:
|
||||
self.health_state_actual = SoftwareHealthState.OVERWHELMED
|
||||
self.set_health_state(SoftwareHealthState.OVERWHELMED)
|
||||
self.sys_log.error(f"{self.name}: Connect request for {connection_id=} declined. Service is at capacity.")
|
||||
return False
|
||||
else:
|
||||
# if service was previously overwhelmed, set to good because there is enough space for connections
|
||||
if self.health_state_actual == SoftwareHealthState.OVERWHELMED:
|
||||
self.health_state_actual = SoftwareHealthState.GOOD
|
||||
self.set_health_state(SoftwareHealthState.GOOD)
|
||||
|
||||
# check that connection already doesn't exist
|
||||
if not self._connections.get(connection_id):
|
||||
|
||||
@@ -40,6 +40,9 @@ from primaite.simulator.network.hardware.base import Link, Node
|
||||
class TestService(Service):
|
||||
"""Test Service class"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return super().describe_state()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "TestService"
|
||||
kwargs["port"] = Port.HTTP
|
||||
@@ -60,7 +63,7 @@ class TestApplication(Application):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
pass
|
||||
return super().describe_state()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -167,7 +170,7 @@ def example_network() -> Network:
|
||||
-------------- --------------
|
||||
| client_1 |----- ----| server_1 |
|
||||
-------------- | -------------- -------------- -------------- | --------------
|
||||
------| switch_1 |------| router_1 |------| switch_2 |------
|
||||
------| switch_2 |------| router_1 |------| switch_1 |------
|
||||
-------------- | -------------- -------------- -------------- | --------------
|
||||
| client_2 |---- ----| server_2 |
|
||||
-------------- --------------
|
||||
|
||||
@@ -90,7 +90,7 @@ def test_repeating_dos_attack(dos_bot_and_db_server):
|
||||
assert db_server_service.health_state_actual is SoftwareHealthState.OVERWHELMED
|
||||
|
||||
db_server_service.clear_connections()
|
||||
db_server_service.health_state_actual = SoftwareHealthState.GOOD
|
||||
db_server_service.set_health_state(SoftwareHealthState.GOOD)
|
||||
assert len(db_server_service.connections) == 0
|
||||
|
||||
computer.apply_timestep(timestep=1)
|
||||
@@ -121,7 +121,7 @@ def test_non_repeating_dos_attack(dos_bot_and_db_server):
|
||||
assert db_server_service.health_state_actual is SoftwareHealthState.OVERWHELMED
|
||||
|
||||
db_server_service.clear_connections()
|
||||
db_server_service.health_state_actual = SoftwareHealthState.GOOD
|
||||
db_server_service.set_health_state(SoftwareHealthState.GOOD)
|
||||
assert len(db_server_service.connections) == 0
|
||||
|
||||
computer.apply_timestep(timestep=1)
|
||||
|
||||
@@ -24,8 +24,8 @@ def populated_node(application_class) -> Tuple[Application, Computer]:
|
||||
return app, computer
|
||||
|
||||
|
||||
def test_service_on_offline_node(application_class):
|
||||
"""Test to check that the service cannot be interacted with when node it is on is off."""
|
||||
def test_application_on_offline_node(application_class):
|
||||
"""Test to check that the application cannot be interacted with when node it is on is off."""
|
||||
computer: Computer = Computer(
|
||||
hostname="test_computer",
|
||||
ip_address="192.168.1.2",
|
||||
@@ -49,8 +49,8 @@ def test_service_on_offline_node(application_class):
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_server_turns_off_service(populated_node):
|
||||
"""Check that the service is turned off when the server is turned off"""
|
||||
def test_server_turns_off_application(populated_node):
|
||||
"""Check that the application is turned off when the server is turned off"""
|
||||
app, computer = populated_node
|
||||
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
@@ -65,8 +65,8 @@ def test_server_turns_off_service(populated_node):
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_service_cannot_be_turned_on_when_server_is_off(populated_node):
|
||||
"""Check that the service cannot be started when the server is off."""
|
||||
def test_application_cannot_be_turned_on_when_computer_is_off(populated_node):
|
||||
"""Check that the application cannot be started when the computer is off."""
|
||||
app, computer = populated_node
|
||||
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
@@ -86,8 +86,8 @@ def test_service_cannot_be_turned_on_when_server_is_off(populated_node):
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_server_turns_on_service(populated_node):
|
||||
"""Check that turning on the server turns on service."""
|
||||
def test_computer_runs_applications(populated_node):
|
||||
"""Check that turning on the computer will turn on applications."""
|
||||
app, computer = populated_node
|
||||
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
@@ -109,13 +109,14 @@ def test_server_turns_on_service(populated_node):
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
assert app.operating_state is ApplicationOperatingState.RUNNING
|
||||
|
||||
computer.start_up_duration = 0
|
||||
computer.shut_down_duration = 0
|
||||
|
||||
computer.power_off()
|
||||
for i in range(computer.start_up_duration + 1):
|
||||
computer.apply_timestep(timestep=i)
|
||||
assert computer.operating_state is NodeOperatingState.OFF
|
||||
assert app.operating_state is ApplicationOperatingState.CLOSED
|
||||
|
||||
computer.power_on()
|
||||
for i in range(computer.start_up_duration + 1):
|
||||
computer.apply_timestep(timestep=i)
|
||||
assert computer.operating_state is NodeOperatingState.ON
|
||||
assert app.operating_state is ApplicationOperatingState.RUNNING
|
||||
|
||||
@@ -117,13 +117,14 @@ def test_server_turns_on_service(populated_node):
|
||||
assert server.operating_state is NodeOperatingState.ON
|
||||
assert service.operating_state is ServiceOperatingState.RUNNING
|
||||
|
||||
server.start_up_duration = 0
|
||||
server.shut_down_duration = 0
|
||||
|
||||
server.power_off()
|
||||
for i in range(server.start_up_duration + 1):
|
||||
server.apply_timestep(timestep=i)
|
||||
assert server.operating_state is NodeOperatingState.OFF
|
||||
assert service.operating_state is ServiceOperatingState.STOPPED
|
||||
|
||||
server.power_on()
|
||||
for i in range(server.start_up_duration + 1):
|
||||
server.apply_timestep(timestep=i)
|
||||
assert server.operating_state is NodeOperatingState.ON
|
||||
assert service.operating_state is ServiceOperatingState.RUNNING
|
||||
|
||||
@@ -53,12 +53,12 @@ def test_node_os_scan(node, service, application):
|
||||
# TODO implement processes
|
||||
|
||||
# add services to node
|
||||
service.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
service.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
node.install_service(service=service)
|
||||
assert service.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
# add application to node
|
||||
application.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
application.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
node.install_application(application=application)
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
@@ -101,7 +101,7 @@ def test_node_red_scan(node, service, application):
|
||||
assert service.revealed_to_red is False
|
||||
|
||||
# add application to node
|
||||
application.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
application.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
node.install_application(application=application)
|
||||
assert application.revealed_to_red is False
|
||||
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
|
||||
def test_scan(application):
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
application.run()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
application.scan()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_visible == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_run_application(application):
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
application.run()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_close_application(application):
|
||||
application.run()
|
||||
assert application.operating_state == ApplicationOperatingState.RUNNING
|
||||
assert application.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
application.close()
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_application_describe_states(application):
|
||||
assert application.operating_state == ApplicationOperatingState.CLOSED
|
||||
assert application.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
assert SoftwareHealthState.UNUSED.value == application.describe_state().get("health_state_actual")
|
||||
|
||||
application.run()
|
||||
assert SoftwareHealthState.GOOD.value == application.describe_state().get("health_state_actual")
|
||||
|
||||
application.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
assert SoftwareHealthState.COMPROMISED.value == application.describe_state().get("health_state_actual")
|
||||
|
||||
application.patch()
|
||||
assert SoftwareHealthState.PATCHING.value == application.describe_state().get("health_state_actual")
|
||||
@@ -19,55 +19,146 @@ def test_scan(service):
|
||||
|
||||
def test_start_service(service):
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
assert service.health_state_actual == SoftwareHealthState.UNUSED
|
||||
service.start()
|
||||
|
||||
assert service.operating_state == ServiceOperatingState.RUNNING
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_stop_service(service):
|
||||
service.start()
|
||||
assert service.operating_state == ServiceOperatingState.RUNNING
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
service.stop()
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_pause_and_resume_service(service):
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
service.resume()
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
assert service.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
service.start()
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
service.pause()
|
||||
assert service.operating_state == ServiceOperatingState.PAUSED
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
service.resume()
|
||||
assert service.operating_state == ServiceOperatingState.RUNNING
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_restart(service):
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
assert service.health_state_actual == SoftwareHealthState.UNUSED
|
||||
service.restart()
|
||||
# Service is STOPPED. Restart will only work if the service was PAUSED or RUNNING
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
assert service.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
service.start()
|
||||
assert service.operating_state == ServiceOperatingState.RUNNING
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
service.restart()
|
||||
# Service is RUNNING. Restart should work
|
||||
assert service.operating_state == ServiceOperatingState.RESTARTING
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
timestep = 0
|
||||
while service.operating_state == ServiceOperatingState.RESTARTING:
|
||||
service.apply_timestep(timestep)
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
timestep += 1
|
||||
|
||||
assert service.operating_state == ServiceOperatingState.RUNNING
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_restart_compromised(service):
|
||||
service.start()
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
# compromise the service
|
||||
service.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
|
||||
service.restart()
|
||||
assert service.operating_state == ServiceOperatingState.RESTARTING
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
"""
|
||||
Service should be compromised even after reset.
|
||||
|
||||
Only way to remove compromised status is via patching.
|
||||
"""
|
||||
|
||||
timestep = 0
|
||||
while service.operating_state == ServiceOperatingState.RESTARTING:
|
||||
service.apply_timestep(timestep)
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
timestep += 1
|
||||
|
||||
assert service.operating_state == ServiceOperatingState.RUNNING
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
|
||||
def test_compromised_service_remains_compromised(service):
|
||||
"""
|
||||
Tests that a compromised service stays compromised.
|
||||
|
||||
The only way that the service can be uncompromised is by running patch.
|
||||
"""
|
||||
service.start()
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
service.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
|
||||
service.stop()
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
service.start()
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
service.disable()
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
service.enable()
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
service.pause()
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
service.resume()
|
||||
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
|
||||
def test_service_patching(service):
|
||||
service.start()
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
service.set_health_state(SoftwareHealthState.COMPROMISED)
|
||||
|
||||
service.patch()
|
||||
assert service.health_state_actual == SoftwareHealthState.PATCHING
|
||||
|
||||
for i in range(service.patching_duration + 1):
|
||||
service.apply_timestep(i)
|
||||
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
|
||||
def test_enable_disable(service):
|
||||
service.disable()
|
||||
assert service.operating_state == ServiceOperatingState.DISABLED
|
||||
assert service.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
service.enable()
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
assert service.health_state_actual == SoftwareHealthState.UNUSED
|
||||
|
||||
|
||||
def test_overwhelm_service(service):
|
||||
@@ -76,13 +167,13 @@ def test_overwhelm_service(service):
|
||||
|
||||
uuid = str(uuid4())
|
||||
assert service.add_connection(connection_id=uuid) # should be true
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
assert not service.add_connection(connection_id=uuid) # fails because connection already exists
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
assert service.add_connection(connection_id=str(uuid4())) # succeed
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
assert service.health_state_actual == SoftwareHealthState.GOOD
|
||||
|
||||
assert not service.add_connection(connection_id=str(uuid4())) # fail because at capacity
|
||||
assert service.health_state_actual is SoftwareHealthState.OVERWHELMED
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.server import Server
|
||||
from primaite.simulator.network.protocols.http import (
|
||||
HttpRequestMethod,
|
||||
@@ -15,7 +16,11 @@ from primaite.simulator.system.services.web_server.web_server import WebServer
|
||||
@pytest.fixture(scope="function")
|
||||
def web_server() -> Server:
|
||||
node = Server(
|
||||
hostname="web_server", ip_address="192.168.1.10", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
|
||||
hostname="web_server",
|
||||
ip_address="192.168.1.10",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="192.168.1.1",
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
node.software_manager.install(software_class=WebServer)
|
||||
node.software_manager.software.get("WebServer").start()
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
from typing import Dict
|
||||
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.software import Software, SoftwareHealthState
|
||||
|
||||
|
||||
class TestSoftware(Software):
|
||||
def describe_state(self) -> Dict:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def software(file_system):
|
||||
return TestSoftware(
|
||||
name="TestSoftware", port=Port.ARP, file_system=file_system, sys_log=SysLog(hostname="test_service")
|
||||
)
|
||||
|
||||
|
||||
def test_software_creation(software):
|
||||
assert software is not None
|
||||
|
||||
|
||||
def test_software_set_health_state(software):
|
||||
assert software.health_state_actual == SoftwareHealthState.UNUSED
|
||||
software.set_health_state(SoftwareHealthState.GOOD)
|
||||
assert software.health_state_actual == SoftwareHealthState.GOOD
|
||||
Reference in New Issue
Block a user