#2151: remove changing of health_state_actual in actions and tests

This commit is contained in:
Czar Echavez
2024-01-09 15:18:31 +00:00
parent 7c0ab1a19e
commit 6fc4e15660
5 changed files with 110 additions and 19 deletions

View File

@@ -43,9 +43,6 @@ class Service(IOSoftware):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.health_state_visible = SoftwareHealthState.UNUSED
self.health_state_actual = SoftwareHealthState.UNUSED
def _can_perform_action(self) -> bool:
"""
Checks if the service can perform actions.
@@ -118,7 +115,6 @@ class Service(IOSoftware):
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
self.sys_log.info(f"Stopping service {self.name}")
self.operating_state = ServiceOperatingState.STOPPED
self.health_state_actual = SoftwareHealthState.UNUSED
def start(self, **kwargs) -> None:
"""Start the service."""
@@ -129,42 +125,39 @@ class Service(IOSoftware):
if self.operating_state == ServiceOperatingState.STOPPED:
self.sys_log.info(f"Starting service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.health_state_actual = SoftwareHealthState.GOOD
# set software health state to GOOD if initially set to UNUSED
if self.health_state_actual == SoftwareHealthState.UNUSED:
self.health_state_actual = SoftwareHealthState.GOOD
def pause(self) -> None:
"""Pause the service."""
if self.operating_state == ServiceOperatingState.RUNNING:
self.sys_log.info(f"Pausing service {self.name}")
self.operating_state = ServiceOperatingState.PAUSED
self.health_state_actual = SoftwareHealthState.OVERWHELMED
def resume(self) -> None:
"""Resume paused service."""
if self.operating_state == ServiceOperatingState.PAUSED:
self.sys_log.info(f"Resuming service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.health_state_actual = SoftwareHealthState.GOOD
def restart(self) -> None:
"""Restart running service."""
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
self.sys_log.info(f"Pausing service {self.name}")
self.operating_state = ServiceOperatingState.RESTARTING
self.health_state_actual = SoftwareHealthState.OVERWHELMED
self.restart_countdown = self.restart_duration
def disable(self) -> None:
"""Disable the service."""
self.sys_log.info(f"Disabling Application {self.name}")
self.operating_state = ServiceOperatingState.DISABLED
self.health_state_actual = SoftwareHealthState.OVERWHELMED
def enable(self) -> None:
"""Enable the disabled service."""
if self.operating_state == ServiceOperatingState.DISABLED:
self.sys_log.info(f"Enabling Application {self.name}")
self.operating_state = ServiceOperatingState.STOPPED
self.health_state_actual = SoftwareHealthState.OVERWHELMED
def apply_timestep(self, timestep: int) -> None:
"""
@@ -181,5 +174,4 @@ class Service(IOSoftware):
if self.restart_countdown <= 0:
_LOGGER.debug(f"Restarting finished for service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.health_state_actual = SoftwareHealthState.GOOD
self.restart_countdown -= 1

View File

@@ -71,9 +71,9 @@ class Software(SimComponent):
name: str
"The name of the software."
health_state_actual: SoftwareHealthState = SoftwareHealthState.GOOD
health_state_actual: SoftwareHealthState = SoftwareHealthState.UNUSED
"The actual health state of the software."
health_state_visible: SoftwareHealthState = SoftwareHealthState.GOOD
health_state_visible: SoftwareHealthState = SoftwareHealthState.UNUSED
"The health state of the software visible to the red agent."
criticality: SoftwareCriticality = SoftwareCriticality.LOWEST
"The criticality level of the software."
@@ -282,7 +282,7 @@ class IOSoftware(Software):
Returns true if the software can perform actions.
"""
if self.software_manager and self.software_manager.node.operating_state is NodeOperatingState.OFF:
if self.software_manager and self.software_manager.node.operating_state == NodeOperatingState.OFF:
_LOGGER.debug(f"{self.name} Error: {self.software_manager.node.hostname} is not online.")
return False
return True

View File

@@ -167,7 +167,7 @@ def example_network() -> Network:
-------------- --------------
| client_1 |----- ----| server_1 |
-------------- | -------------- -------------- -------------- | --------------
------| switch_1 |------| router_1 |------| switch_2 |------
------| switch_2 |------| router_1 |------| switch_1 |------
-------------- | -------------- -------------- -------------- | --------------
| client_2 |---- ----| server_2 |
-------------- --------------

View File

@@ -19,55 +19,149 @@ def test_scan(service):
def test_start_service(service):
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.start()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.health_state_actual == SoftwareHealthState.GOOD
def test_stop_service(service):
service.start()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.health_state_actual == SoftwareHealthState.GOOD
service.stop()
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.GOOD
def test_pause_and_resume_service(service):
assert service.operating_state == ServiceOperatingState.STOPPED
service.resume()
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
service.pause()
assert service.operating_state == ServiceOperatingState.PAUSED
assert service.health_state_actual == SoftwareHealthState.GOOD
service.resume()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.health_state_actual == SoftwareHealthState.GOOD
def test_restart(service):
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.restart()
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
service.restart()
assert service.operating_state == ServiceOperatingState.RESTARTING
assert service.health_state_actual == SoftwareHealthState.GOOD
timestep = 0
while service.operating_state == ServiceOperatingState.RESTARTING:
service.apply_timestep(timestep)
assert service.health_state_actual == SoftwareHealthState.GOOD
timestep += 1
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.health_state_actual == SoftwareHealthState.GOOD
def test_restart_compromised(service):
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.restart()
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
# compromise the service
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.restart()
assert service.operating_state == ServiceOperatingState.RESTARTING
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
"""
Service should be compromised even after reset.
Only way to remove compromised status is via patching.
"""
timestep = 0
while service.operating_state == ServiceOperatingState.RESTARTING:
service.apply_timestep(timestep)
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
timestep += 1
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
def test_compromised_service_remains_compromised(service):
"""
Tests that a compromised service stays compromised.
The only way that the service can be uncompromised is by running patch.
"""
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.stop()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
service.start()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
service.disable()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
service.enable()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
service.pause()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
service.resume()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
def test_service_patching(service):
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.patch()
assert service.health_state_actual == SoftwareHealthState.PATCHING
for i in range(service.patching_duration + 1):
service.apply_timestep(i)
assert service.health_state_actual == SoftwareHealthState.GOOD
def test_enable_disable(service):
service.disable()
assert service.operating_state == ServiceOperatingState.DISABLED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.enable()
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
def test_overwhelm_service(service):
@@ -76,13 +170,13 @@ def test_overwhelm_service(service):
uuid = str(uuid4())
assert service.add_connection(connection_id=uuid) # should be true
assert service.health_state_actual is SoftwareHealthState.GOOD
assert service.health_state_actual == SoftwareHealthState.GOOD
assert not service.add_connection(connection_id=uuid) # fails because connection already exists
assert service.health_state_actual is SoftwareHealthState.GOOD
assert service.health_state_actual == SoftwareHealthState.GOOD
assert service.add_connection(connection_id=str(uuid4())) # succeed
assert service.health_state_actual is SoftwareHealthState.GOOD
assert service.health_state_actual == SoftwareHealthState.GOOD
assert not service.add_connection(connection_id=str(uuid4())) # fail because at capacity
assert service.health_state_actual is SoftwareHealthState.OVERWHELMED

View File

@@ -1,5 +1,6 @@
import pytest
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.protocols.http import (
HttpRequestMethod,
@@ -15,7 +16,11 @@ from primaite.simulator.system.services.web_server.web_server import WebServer
@pytest.fixture(scope="function")
def web_server() -> Server:
node = Server(
hostname="web_server", ip_address="192.168.1.10", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
hostname="web_server",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
operating_state=NodeOperatingState.ON,
)
node.software_manager.install(software_class=WebServer)
node.software_manager.software.get("WebServer").start()