From ab73ac20e84766b5c77688b9adf0e02a98b22d6b Mon Sep 17 00:00:00 2001 From: Marek Wolan Date: Mon, 1 Jul 2024 14:41:41 +0100 Subject: [PATCH] #2700 add ransomware configure action --- src/primaite/game/agent/actions.py | 27 ++++++- .../system/applications/database_client.py | 2 +- .../red_applications/ransomware_script.py | 14 +++- .../{observations => }/actions/__init__.py | 0 .../actions/test_configure_actions.py | 75 ++++++++++++++++++- 5 files changed, 113 insertions(+), 5 deletions(-) rename tests/integration_tests/game_layer/{observations => }/actions/__init__.py (100%) rename tests/integration_tests/game_layer/{observations => }/actions/test_configure_actions.py (52%) diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index 4cb31d25..9f2693e5 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Literal, Optional, Tuple, TYPE_CHECKING, Union from gymnasium import spaces -from pydantic import BaseModel, Field, field_validator, ValidationInfo +from pydantic import BaseModel, ConfigDict, Field, field_validator, ValidationInfo from primaite import getLogger from primaite.interface.request import RequestFormat @@ -252,6 +252,7 @@ class ConfigureDatabaseClientAction(AbstractAction): class _Opts(BaseModel): """Schema for options that can be passed to this action.""" + model_config = ConfigDict(extra="forbid") server_ip_address: Optional[str] = None server_password: Optional[str] = None @@ -267,6 +268,29 @@ class ConfigureDatabaseClientAction(AbstractAction): return ["network", "node", node_name, "application", "DatabaseClient", "configure", options] +class ConfigureRansomwareScriptAction(AbstractAction): + """Action which sets config parameters for a ransomware script on a node.""" + + class _Opts(BaseModel): + """Schema for options that can be passed to this option.""" + + model_config = ConfigDict(extra="forbid") + server_ip_address: Optional[str] = None + server_password: Optional[str] = None + payload: Optional[str] = None + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request(self, node_id: int, options: Dict) -> RequestFormat: + """Return the action formatted as a request that can be ingested by the simulation.""" + node_name = self.manager.get_node_name_by_idx(node_id) + if node_name is None: + return ["do_nothing"] + ConfigureRansomwareScriptAction._Opts.model_validate(options) # check that options adhere to schema + return ["network", "node", node_name, "application", "RansomwareScript", "configure", options] + + class NodeApplicationRemoveAction(AbstractAction): """Action which removes/uninstalls an application.""" @@ -1068,6 +1092,7 @@ class ActionManager: "NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction, "NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction, "CONFIGURE_DATABASE_CLIENT": ConfigureDatabaseClientAction, + "CONFIGURE_RANSOMWARE_SCRIPT": ConfigureRansomwareScriptAction, } """Dictionary which maps action type strings to the corresponding action class.""" diff --git a/src/primaite/simulator/system/applications/database_client.py b/src/primaite/simulator/system/applications/database_client.py index 6396c678..fcfd603b 100644 --- a/src/primaite/simulator/system/applications/database_client.py +++ b/src/primaite/simulator/system/applications/database_client.py @@ -104,7 +104,7 @@ class DatabaseClient(Application): success = self.configure(server_ip_address=ip, server_password=pw) return RequestResponse.from_bool(success) - rm.add_request("configure", RequestType(func=lambda request, context: _configure(request, context))) + rm.add_request("configure", RequestType(func=_configure)) return rm def execute(self) -> bool: diff --git a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py index af4a59d4..46e42fc2 100644 --- a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py +++ b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py @@ -2,7 +2,7 @@ from ipaddress import IPv4Address from typing import Dict, Optional -from primaite.interface.request import RequestResponse +from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port @@ -62,6 +62,15 @@ class RansomwareScript(Application): name="execute", request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.attack())), ) + + def _configure(request: RequestFormat, context: Dict) -> RequestResponse: + ip = request[-1].get("server_ip_address") + ip = None if ip is None else IPv4Address(ip) + pw = request[-1].get("server_password") + payload = request[-1].get("payload") + return RequestResponse.from_bool(self.configure(ip, pw, payload)) + + rm.add_request("configure", request_type=RequestType(func=_configure)) return rm def run(self) -> bool: @@ -91,7 +100,7 @@ class RansomwareScript(Application): server_ip_address: IPv4Address, server_password: Optional[str] = None, payload: Optional[str] = None, - ): + ) -> bool: """ Configure the Ransomware Script to communicate with a DatabaseService. @@ -108,6 +117,7 @@ class RansomwareScript(Application): self.sys_log.info( f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}." ) + return True def attack(self) -> bool: """Perform the attack steps after opening the application.""" diff --git a/tests/integration_tests/game_layer/observations/actions/__init__.py b/tests/integration_tests/game_layer/actions/__init__.py similarity index 100% rename from tests/integration_tests/game_layer/observations/actions/__init__.py rename to tests/integration_tests/game_layer/actions/__init__.py diff --git a/tests/integration_tests/game_layer/observations/actions/test_configure_actions.py b/tests/integration_tests/game_layer/actions/test_configure_actions.py similarity index 52% rename from tests/integration_tests/game_layer/observations/actions/test_configure_actions.py rename to tests/integration_tests/game_layer/actions/test_configure_actions.py index 17e262d1..5439f3c9 100644 --- a/tests/integration_tests/game_layer/observations/actions/test_configure_actions.py +++ b/tests/integration_tests/game_layer/actions/test_configure_actions.py @@ -1,8 +1,12 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK from ipaddress import IPv4Address -from primaite.game.agent.actions import ConfigureDatabaseClientAction +import pytest +from pydantic import ValidationError + +from primaite.game.agent.actions import ConfigureDatabaseClientAction, ConfigureRansomwareScriptAction from primaite.simulator.system.applications.database_client import DatabaseClient +from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from tests.conftest import ControlledAgent @@ -83,3 +87,72 @@ class TestConfigureDatabaseAction: assert db_client.server_ip_address == old_ip assert db_client.server_password is "admin123" + + +class TestConfigureRansomwareScriptAction: + @pytest.mark.parametrize( + "options", + [ + {}, + {"server_ip_address": "181.181.181.181"}, + {"server_password": "admin123"}, + {"payload": "ENCRYPT"}, + { + "server_ip_address": "181.181.181.181", + "server_password": "admin123", + "payload": "ENCRYPT", + }, + ], + ) + def test_configure_ip_password(self, game_and_agent, options): + game, agent = game_and_agent + agent: ControlledAgent + agent.action_manager.actions["CONFIGURE_RANSOMWARE_SCRIPT"] = ConfigureRansomwareScriptAction( + agent.action_manager + ) + + # make sure there is a database client on this node + client_1 = game.simulation.network.get_node_by_hostname("client_1") + client_1.software_manager.install(RansomwareScript) + ransomware_script: RansomwareScript = client_1.software_manager.software["RansomwareScript"] + + old_ip = ransomware_script.server_ip_address + old_pw = ransomware_script.server_password + old_payload = ransomware_script.payload + + action = ( + "CONFIGURE_RANSOMWARE_SCRIPT", + {"node_id": 0, "options": options}, + ) + agent.store_action(action) + game.step() + + expected_ip = old_ip if "server_ip_address" not in options else IPv4Address(options["server_ip_address"]) + expected_pw = old_pw if "server_password" not in options else options["server_password"] + expected_payload = old_payload if "payload" not in options else options["payload"] + + assert ransomware_script.server_ip_address == expected_ip + assert ransomware_script.server_password == expected_pw + assert ransomware_script.payload == expected_payload + + def test_invalid_options(self, game_and_agent): + game, agent = game_and_agent + agent: ControlledAgent + agent.action_manager.actions["CONFIGURE_RANSOMWARE_SCRIPT"] = ConfigureRansomwareScriptAction( + agent.action_manager + ) + + # make sure there is a database client on this node + client_1 = game.simulation.network.get_node_by_hostname("client_1") + client_1.software_manager.install(RansomwareScript) + ransomware_script: RansomwareScript = client_1.software_manager.software["RansomwareScript"] + action = ( + "CONFIGURE_RANSOMWARE_SCRIPT", + { + "node_id": 0, + "options": {"server_password": "admin123", "bad_option": 70}, + }, + ) + agent.store_action(action) + with pytest.raises(ValidationError): + game.step()