From d503e51c2ddc9945d20de5d487552b21719c8797 Mon Sep 17 00:00:00 2001 From: Czar Echavez Date: Wed, 6 Sep 2023 11:12:03 +0100 Subject: [PATCH] #1814: Remove hardcoded values + added test + remove unnecessary private parent attribute --- src/primaite/simulator/core.py | 25 ++------------- .../simulator/system/core/software_manager.py | 2 +- .../red_services/data_manipulator_service.py | 18 +++++++---- .../simulator/system/services/service.py | 16 +++++----- .../_services/_red_services/__init__.py | 0 .../test_data_manipulator_service.py | 32 +++++++++++++++++++ 6 files changed, 55 insertions(+), 38 deletions(-) create mode 100644 tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/__init__.py create mode 100644 tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/test_data_manipulator_service.py diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index 5ae7b492..32db95c6 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -1,6 +1,6 @@ """Core of the PrimAITE Simulator.""" from abc import ABC, abstractmethod -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional from uuid import uuid4 from pydantic import BaseModel, ConfigDict @@ -140,7 +140,7 @@ class SimComponent(BaseModel): kwargs["uuid"] = str(uuid4()) super().__init__(**kwargs) self._action_manager: ActionManager = self._init_action_manager() - self._parent: Optional["SimComponent"] = None + self.parent: Optional["SimComponent"] = None def _init_action_manager(self) -> ActionManager: """ @@ -213,24 +213,3 @@ class SimComponent(BaseModel): Override this method with anything that needs to happen within the component for it to be reset. """ pass - - @property - def parent(self) -> "SimComponent": - """Reference to the parent object which manages this object. - - :return: Parent object. - :rtype: SimComponent - """ - return self._parent - - @parent.setter - def parent(self, new_parent: Union["SimComponent", None]) -> None: - if self._parent and new_parent: - msg = f"Overwriting parent of {self.uuid}. Old parent: {self._parent.uuid}, New parent: {new_parent.uuid}" - _LOGGER.warn(msg) - raise RuntimeWarning(msg) - self._parent = new_parent - - @parent.deleter - def parent(self) -> None: - self._parent = None diff --git a/src/primaite/simulator/system/core/software_manager.py b/src/primaite/simulator/system/core/software_manager.py index 312f6d84..28e37963 100644 --- a/src/primaite/simulator/system/core/software_manager.py +++ b/src/primaite/simulator/system/core/software_manager.py @@ -39,7 +39,7 @@ class SoftwareManager: """ Add a Service to the manager. - :param service_class: The class of the service to add + :param: service_class: The class of the service to add """ service = service_class(software_manager=self, sys_log=self.sys_log) diff --git a/src/primaite/simulator/system/services/red_services/data_manipulator_service.py b/src/primaite/simulator/system/services/red_services/data_manipulator_service.py index 29f0d3f8..82b9aa1c 100644 --- a/src/primaite/simulator/system/services/red_services/data_manipulator_service.py +++ b/src/primaite/simulator/system/services/red_services/data_manipulator_service.py @@ -1,8 +1,8 @@ from ipaddress import IPv4Address +from typing import Any, Optional from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.core.software_manager import SoftwareManager from primaite.simulator.system.services.service import Service @@ -20,9 +20,15 @@ class DataManipulatorService(Service): kwargs["protocol"] = IPProtocol.TCP super().__init__(**kwargs) - def run(self): - """Run the DataManipulatorService actions.""" - software_manager: SoftwareManager = self.software_manager - software_manager.send_payload_to_session_manager( - payload="SELECT * FROM users", dest_ip_address=IPv4Address("192.168.1.14"), dest_port=self.port + def start(self, target_ip_address: IPv4Address, payload: Optional[Any] = "DELETE TABLE users", **kwargs): + """ + Run the DataManipulatorService actions. + + :param: target_ip_address: The IP address of the target machine to attack + :param: payload: The payload to send to the target machine + """ + super().start() + + self.software_manager.send_payload_to_session_manager( + payload=payload, dest_ip_address=target_ip_address, dest_port=self.port ) diff --git a/src/primaite/simulator/system/services/service.py b/src/primaite/simulator/system/services/service.py index 6a8c9abf..b9340103 100644 --- a/src/primaite/simulator/system/services/service.py +++ b/src/primaite/simulator/system/services/service.py @@ -100,49 +100,49 @@ class Service(IOSoftware): """Stop the service.""" _LOGGER.debug(f"Stopping service {self.name}") if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]: - self.parent.sys_log.info(f"Stopping service {self.name}") + self.sys_log.info(f"Stopping service {self.name}") self.operating_state = ServiceOperatingState.STOPPED - def start(self) -> None: + def start(self, **kwargs) -> None: """Start the service.""" _LOGGER.debug(f"Starting service {self.name}") if self.operating_state == ServiceOperatingState.STOPPED: - self.parent.sys_log.info(f"Starting service {self.name}") + self.sys_log.info(f"Starting service {self.name}") self.operating_state = ServiceOperatingState.RUNNING def pause(self) -> None: """Pause the service.""" _LOGGER.debug(f"Pausing service {self.name}") if self.operating_state == ServiceOperatingState.RUNNING: - self.parent.sys_log.info(f"Pausing service {self.name}") + self.sys_log.info(f"Pausing service {self.name}") self.operating_state = ServiceOperatingState.PAUSED def resume(self) -> None: """Resume paused service.""" _LOGGER.debug(f"Resuming service {self.name}") if self.operating_state == ServiceOperatingState.PAUSED: - self.parent.sys_log.info(f"Resuming service {self.name}") + self.sys_log.info(f"Resuming service {self.name}") self.operating_state = ServiceOperatingState.RUNNING def restart(self) -> None: """Restart running service.""" _LOGGER.debug(f"Restarting service {self.name}") if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]: - self.parent.sys_log.info(f"Pausing service {self.name}") + self.sys_log.info(f"Pausing service {self.name}") self.operating_state = ServiceOperatingState.RESTARTING self.restart_countdown = self.restarting_duration def disable(self) -> None: """Disable the service.""" _LOGGER.debug(f"Disabling service {self.name}") - self.parent.sys_log.info(f"Disabling Application {self.name}") + self.sys_log.info(f"Disabling Application {self.name}") self.operating_state = ServiceOperatingState.DISABLED def enable(self) -> None: """Enable the disabled service.""" _LOGGER.debug(f"Enabling service {self.name}") if self.operating_state == ServiceOperatingState.DISABLED: - self.parent.sys_log.info(f"Enabling Application {self.name}") + self.sys_log.info(f"Enabling Application {self.name}") self.operating_state = ServiceOperatingState.STOPPED def apply_timestep(self, timestep: int) -> None: diff --git a/tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/__init__.py b/tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/test_data_manipulator_service.py b/tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/test_data_manipulator_service.py new file mode 100644 index 00000000..f5b37175 --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_system/_services/_red_services/test_data_manipulator_service.py @@ -0,0 +1,32 @@ +from ipaddress import IPv4Address + +from primaite.simulator.network.hardware.base import Node +from primaite.simulator.network.networks import arcd_uc2_network +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.red_services.data_manipulator_service import DataManipulatorService + + +def test_creation(): + network = arcd_uc2_network() + + client_1: Node = network.get_node_by_hostname("client_1") + + client_1.software_manager.add_service(service_class=DataManipulatorService) + + data_manipulator_service: DataManipulatorService = client_1.software_manager.services["DataManipulatorBot"] + + assert data_manipulator_service.name == "DataManipulatorBot" + assert data_manipulator_service.port == Port.POSTGRES_SERVER + assert data_manipulator_service.protocol == IPProtocol.TCP + + # should have no session yet + assert len(client_1.session_manager.sessions_by_uuid) == 0 + + try: + data_manipulator_service.start(target_ip_address=IPv4Address("192.168.1.14")) + except Exception as e: + assert False, f"Test was not supposed to throw exception: {e}" + + # there should be a session after the service is started + assert len(client_1.session_manager.sessions_by_uuid) == 1