#2151: utilise set_health_state method instead of directly changing software states

This commit is contained in:
Czar Echavez
2024-01-09 16:29:40 +00:00
parent 9eb0102069
commit a4d372d3eb
8 changed files with 45 additions and 22 deletions

View File

@@ -3,7 +3,7 @@ from enum import Enum
from typing import Any, Dict, Set
from primaite import getLogger
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
from primaite.simulator.system.software import IOSoftware
_LOGGER = getLogger(__name__)
@@ -38,9 +38,6 @@ class Application(IOSoftware):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.health_state_visible = SoftwareHealthState.UNUSED
self.health_state_actual = SoftwareHealthState.UNUSED
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()

View File

@@ -196,7 +196,7 @@ class DatabaseService(Service):
return {"status_code": 404, "data": False}
elif query == "DELETE":
if self.health_state_actual == SoftwareHealthState.GOOD:
self.health_state_actual = SoftwareHealthState.COMPROMISED
self.set_health_state(SoftwareHealthState.COMPROMISED)
return {
"status_code": 200,
"type": "sql",

View File

@@ -127,7 +127,7 @@ class Service(IOSoftware):
self.operating_state = ServiceOperatingState.RUNNING
# set software health state to GOOD if initially set to UNUSED
if self.health_state_actual == SoftwareHealthState.UNUSED:
self.health_state_actual = SoftwareHealthState.GOOD
self.set_health_state(SoftwareHealthState.GOOD)
def pause(self) -> None:
"""Pause the service."""

View File

@@ -303,13 +303,13 @@ class IOSoftware(Software):
"""
# if over or at capacity, set to overwhelmed
if len(self._connections) >= self.max_sessions:
self.health_state_actual = SoftwareHealthState.OVERWHELMED
self.set_health_state(SoftwareHealthState.OVERWHELMED)
self.sys_log.error(f"{self.name}: Connect request for {connection_id=} declined. Service is at capacity.")
return False
else:
# if service was previously overwhelmed, set to good because there is enough space for connections
if self.health_state_actual == SoftwareHealthState.OVERWHELMED:
self.health_state_actual = SoftwareHealthState.GOOD
self.set_health_state(SoftwareHealthState.GOOD)
# check that connection already doesn't exist
if not self._connections.get(connection_id):

View File

@@ -90,7 +90,7 @@ def test_repeating_dos_attack(dos_bot_and_db_server):
assert db_server_service.health_state_actual is SoftwareHealthState.OVERWHELMED
db_server_service.clear_connections()
db_server_service.health_state_actual = SoftwareHealthState.GOOD
db_server_service.set_health_state(SoftwareHealthState.GOOD)
assert len(db_server_service.connections) == 0
computer.apply_timestep(timestep=1)
@@ -121,7 +121,7 @@ def test_non_repeating_dos_attack(dos_bot_and_db_server):
assert db_server_service.health_state_actual is SoftwareHealthState.OVERWHELMED
db_server_service.clear_connections()
db_server_service.health_state_actual = SoftwareHealthState.GOOD
db_server_service.set_health_state(SoftwareHealthState.GOOD)
assert len(db_server_service.connections) == 0
computer.apply_timestep(timestep=1)

View File

@@ -53,12 +53,12 @@ def test_node_os_scan(node, service, application):
# TODO implement processes
# add services to node
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.set_health_state(SoftwareHealthState.COMPROMISED)
node.install_service(service=service)
assert service.health_state_visible == SoftwareHealthState.UNUSED
# add application to node
application.health_state_actual = SoftwareHealthState.COMPROMISED
application.set_health_state(SoftwareHealthState.COMPROMISED)
node.install_application(application=application)
assert application.health_state_visible == SoftwareHealthState.UNUSED
@@ -101,7 +101,7 @@ def test_node_red_scan(node, service, application):
assert service.revealed_to_red is False
# add application to node
application.health_state_actual = SoftwareHealthState.COMPROMISED
application.set_health_state(SoftwareHealthState.COMPROMISED)
node.install_application(application=application)
assert application.revealed_to_red is False

View File

@@ -57,12 +57,15 @@ def test_restart(service):
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.restart()
# Service is STOPPED. Restart will only work if the service was PAUSED or RUNNING
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.start()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.health_state_actual == SoftwareHealthState.GOOD
service.restart()
# Service is RUNNING. Restart should work
assert service.operating_state == ServiceOperatingState.RESTARTING
assert service.health_state_actual == SoftwareHealthState.GOOD
@@ -77,17 +80,11 @@ def test_restart(service):
def test_restart_compromised(service):
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.restart()
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.health_state_actual == SoftwareHealthState.UNUSED
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
# compromise the service
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.set_health_state(SoftwareHealthState.COMPROMISED)
service.restart()
assert service.operating_state == ServiceOperatingState.RESTARTING
@@ -118,7 +115,7 @@ def test_compromised_service_remains_compromised(service):
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.set_health_state(SoftwareHealthState.COMPROMISED)
service.stop()
assert service.health_state_actual == SoftwareHealthState.COMPROMISED
@@ -143,7 +140,7 @@ def test_service_patching(service):
service.start()
assert service.health_state_actual == SoftwareHealthState.GOOD
service.health_state_actual = SoftwareHealthState.COMPROMISED
service.set_health_state(SoftwareHealthState.COMPROMISED)
service.patch()
assert service.health_state_actual == SoftwareHealthState.PATCHING

View File

@@ -0,0 +1,29 @@
from typing import Dict
import pytest
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.software import Software, SoftwareHealthState
class TestSoftware(Software):
def describe_state(self) -> Dict:
pass
@pytest.fixture(scope="function")
def software(file_system):
return TestSoftware(
name="TestSoftware", port=Port.ARP, file_system=file_system, sys_log=SysLog(hostname="test_service")
)
def test_software_creation(software):
assert software is not None
def test_software_set_health_state(software):
assert software.health_state_actual == SoftwareHealthState.UNUSED
software.set_health_state(SoftwareHealthState.GOOD)
assert software.health_state_actual == SoftwareHealthState.GOOD