From bf8ec6083331adfcd089a55a4b0b6fc60b0423b3 Mon Sep 17 00:00:00 2001 From: Marek Wolan Date: Mon, 1 Jul 2024 15:25:20 +0100 Subject: [PATCH] #2700 Add configure dosbot action --- src/primaite/game/agent/actions.py | 28 ++++++++++++ .../applications/red_applications/dos_bot.py | 17 +++++-- .../actions/test_configure_actions.py | 45 ++++++++++++++++++- 3 files changed, 85 insertions(+), 5 deletions(-) diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index 9f2693e5..1de5276c 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -291,6 +291,33 @@ class ConfigureRansomwareScriptAction(AbstractAction): return ["network", "node", node_name, "application", "RansomwareScript", "configure", options] +class ConfigureDoSBotAction(AbstractAction): + """Action which sets config parameters for a DoS bot on a node.""" + + class _Opts(BaseModel): + """Schema for options that can be passed to this option.""" + + model_config = ConfigDict(extra="forbid") + target_ip_address: Optional[str] = None + target_port: Optional[str] = None + payload: Optional[str] = None + repeat: Optional[bool] = None + port_scan_p_of_success: Optional[float] = None + dos_intensity: Optional[float] = None + max_sessions: Optional[int] = None + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request(self, node_id: int, options: Dict) -> RequestFormat: + """Return the action formatted as a request that can be ingested by the simulation.""" + node_name = self.manager.get_node_name_by_idx(node_id) + if node_name is None: + return ["do_nothing"] + self._Opts.model_validate(options) # check that options adhere to schema + return ["network", "node", node_name, "application", "DoSBot", "configure", options] + + class NodeApplicationRemoveAction(AbstractAction): """Action which removes/uninstalls an application.""" @@ -1093,6 +1120,7 @@ class ActionManager: "NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction, "CONFIGURE_DATABASE_CLIENT": ConfigureDatabaseClientAction, "CONFIGURE_RANSOMWARE_SCRIPT": ConfigureRansomwareScriptAction, + "CONFIGURE_DOSBOT": ConfigureDoSBotAction, } """Dictionary which maps action type strings to the corresponding action class.""" diff --git a/src/primaite/simulator/system/applications/red_applications/dos_bot.py b/src/primaite/simulator/system/applications/red_applications/dos_bot.py index 65e34227..dccf45f5 100644 --- a/src/primaite/simulator/system/applications/red_applications/dos_bot.py +++ b/src/primaite/simulator/system/applications/red_applications/dos_bot.py @@ -1,11 +1,11 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK from enum import IntEnum from ipaddress import IPv4Address -from typing import Optional +from typing import Dict, Optional from primaite import getLogger from primaite.game.science import simulate_trial -from primaite.interface.request import RequestResponse +from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.database_client import DatabaseClient @@ -71,6 +71,14 @@ class DoSBot(DatabaseClient): request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.run())), ) + def _configure(request: RequestFormat, context: Dict) -> RequestResponse: + if "target_ip_address" in request[-1]: + request[-1]["target_ip_address"] = IPv4Address(request[-1]["target_ip_address"]) + if "target_port" in request[-1]: + request[-1]["target_port"] = Port[request[-1]["target_port"]] + return RequestResponse.from_bool(self.configure(**request[-1])) + + rm.add_request("configure", request_type=RequestType(func=_configure)) return rm def configure( @@ -82,7 +90,7 @@ class DoSBot(DatabaseClient): port_scan_p_of_success: float = 0.1, dos_intensity: float = 1.0, max_sessions: int = 1000, - ): + ) -> bool: """ Configure the Denial of Service bot. @@ -90,7 +98,7 @@ class DoSBot(DatabaseClient): :param: target_port: The port of the target service. Optional - Default is `Port.HTTP` :param: payload: The payload the DoS Bot will throw at the target service. Optional - Default is `None` :param: repeat: If True, the bot will maintain the attack. Optional - Default is `True` - :param: port_scan_p_of_success: The chance of the port scan being sucessful. Optional - Default is 0.1 (10%) + :param: port_scan_p_of_success: The chance of the port scan being successful. Optional - Default is 0.1 (10%) :param: dos_intensity: The intensity of the DoS attack. Multiplied with the application's max session - Default is 1.0 :param: max_sessions: The maximum number of sessions the DoS bot will attack with. Optional - Default is 1000 @@ -106,6 +114,7 @@ class DoSBot(DatabaseClient): f"{self.name}: Configured the {self.name} with {target_ip_address=}, {target_port=}, {payload=}, " f"{repeat=}, {port_scan_p_of_success=}, {dos_intensity=}, {max_sessions=}." ) + return True def run(self) -> bool: """Run the Denial of Service Bot.""" diff --git a/tests/integration_tests/game_layer/actions/test_configure_actions.py b/tests/integration_tests/game_layer/actions/test_configure_actions.py index 5439f3c9..6bcd3b52 100644 --- a/tests/integration_tests/game_layer/actions/test_configure_actions.py +++ b/tests/integration_tests/game_layer/actions/test_configure_actions.py @@ -4,8 +4,14 @@ from ipaddress import IPv4Address import pytest from pydantic import ValidationError -from primaite.game.agent.actions import ConfigureDatabaseClientAction, ConfigureRansomwareScriptAction +from primaite.game.agent.actions import ( + ConfigureDatabaseClientAction, + ConfigureDoSBotAction, + ConfigureRansomwareScriptAction, +) +from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.database_client import DatabaseClient +from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from tests.conftest import ControlledAgent @@ -156,3 +162,40 @@ class TestConfigureRansomwareScriptAction: agent.store_action(action) with pytest.raises(ValidationError): game.step() + + +class TestConfigureDoSBot: + def test_configure_DoSBot(self, game_and_agent): + game, agent = game_and_agent + agent: ControlledAgent + agent.action_manager.actions["CONFIGURE_DOSBOT"] = ConfigureDoSBotAction(agent.action_manager) + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + client_1.software_manager.install(DoSBot) + dos_bot: DoSBot = client_1.software_manager.software["DoSBot"] + + action = ( + "CONFIGURE_DOSBOT", + { + "node_id": 0, + "options": { + "target_ip_address": "192.168.1.99", + "target_port": "POSTGRES_SERVER", + "payload": "HACC", + "repeat": False, + "port_scan_p_of_success": 0.875, + "dos_intensity": 0.75, + "max_sessions": 50, + }, + }, + ) + agent.store_action(action) + game.step() + + assert dos_bot.target_ip_address == IPv4Address("192.168.1.99") + assert dos_bot.target_port == Port.POSTGRES_SERVER + assert dos_bot.payload == "HACC" + assert not dos_bot.repeat + assert dos_bot.port_scan_p_of_success == 0.875 + assert dos_bot.dos_intensity == 0.75 + assert dos_bot.max_sessions == 50