#2700 Add configure dosbot action
This commit is contained in:
@@ -291,6 +291,33 @@ class ConfigureRansomwareScriptAction(AbstractAction):
|
||||
return ["network", "node", node_name, "application", "RansomwareScript", "configure", options]
|
||||
|
||||
|
||||
class ConfigureDoSBotAction(AbstractAction):
|
||||
"""Action which sets config parameters for a DoS bot on a node."""
|
||||
|
||||
class _Opts(BaseModel):
|
||||
"""Schema for options that can be passed to this option."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
target_ip_address: Optional[str] = None
|
||||
target_port: Optional[str] = None
|
||||
payload: Optional[str] = None
|
||||
repeat: Optional[bool] = None
|
||||
port_scan_p_of_success: Optional[float] = None
|
||||
dos_intensity: Optional[float] = None
|
||||
max_sessions: Optional[int] = None
|
||||
|
||||
def __init__(self, manager: "ActionManager", **kwargs) -> None:
|
||||
super().__init__(manager=manager)
|
||||
|
||||
def form_request(self, node_id: int, options: Dict) -> RequestFormat:
|
||||
"""Return the action formatted as a request that can be ingested by the simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
if node_name is None:
|
||||
return ["do_nothing"]
|
||||
self._Opts.model_validate(options) # check that options adhere to schema
|
||||
return ["network", "node", node_name, "application", "DoSBot", "configure", options]
|
||||
|
||||
|
||||
class NodeApplicationRemoveAction(AbstractAction):
|
||||
"""Action which removes/uninstalls an application."""
|
||||
|
||||
@@ -1093,6 +1120,7 @@ class ActionManager:
|
||||
"NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction,
|
||||
"CONFIGURE_DATABASE_CLIENT": ConfigureDatabaseClientAction,
|
||||
"CONFIGURE_RANSOMWARE_SCRIPT": ConfigureRansomwareScriptAction,
|
||||
"CONFIGURE_DOSBOT": ConfigureDoSBotAction,
|
||||
}
|
||||
"""Dictionary which maps action type strings to the corresponding action class."""
|
||||
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
from enum import IntEnum
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Optional
|
||||
from typing import Dict, Optional
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.game.science import simulate_trial
|
||||
from primaite.interface.request import RequestResponse
|
||||
from primaite.interface.request import RequestFormat, RequestResponse
|
||||
from primaite.simulator.core import RequestManager, RequestType
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient
|
||||
@@ -71,6 +71,14 @@ class DoSBot(DatabaseClient):
|
||||
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.run())),
|
||||
)
|
||||
|
||||
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
if "target_ip_address" in request[-1]:
|
||||
request[-1]["target_ip_address"] = IPv4Address(request[-1]["target_ip_address"])
|
||||
if "target_port" in request[-1]:
|
||||
request[-1]["target_port"] = Port[request[-1]["target_port"]]
|
||||
return RequestResponse.from_bool(self.configure(**request[-1]))
|
||||
|
||||
rm.add_request("configure", request_type=RequestType(func=_configure))
|
||||
return rm
|
||||
|
||||
def configure(
|
||||
@@ -82,7 +90,7 @@ class DoSBot(DatabaseClient):
|
||||
port_scan_p_of_success: float = 0.1,
|
||||
dos_intensity: float = 1.0,
|
||||
max_sessions: int = 1000,
|
||||
):
|
||||
) -> bool:
|
||||
"""
|
||||
Configure the Denial of Service bot.
|
||||
|
||||
@@ -90,7 +98,7 @@ class DoSBot(DatabaseClient):
|
||||
:param: target_port: The port of the target service. Optional - Default is `Port.HTTP`
|
||||
:param: payload: The payload the DoS Bot will throw at the target service. Optional - Default is `None`
|
||||
:param: repeat: If True, the bot will maintain the attack. Optional - Default is `True`
|
||||
:param: port_scan_p_of_success: The chance of the port scan being sucessful. Optional - Default is 0.1 (10%)
|
||||
:param: port_scan_p_of_success: The chance of the port scan being successful. Optional - Default is 0.1 (10%)
|
||||
:param: dos_intensity: The intensity of the DoS attack.
|
||||
Multiplied with the application's max session - Default is 1.0
|
||||
:param: max_sessions: The maximum number of sessions the DoS bot will attack with. Optional - Default is 1000
|
||||
@@ -106,6 +114,7 @@ class DoSBot(DatabaseClient):
|
||||
f"{self.name}: Configured the {self.name} with {target_ip_address=}, {target_port=}, {payload=}, "
|
||||
f"{repeat=}, {port_scan_p_of_success=}, {dos_intensity=}, {max_sessions=}."
|
||||
)
|
||||
return True
|
||||
|
||||
def run(self) -> bool:
|
||||
"""Run the Denial of Service Bot."""
|
||||
|
||||
@@ -4,8 +4,14 @@ from ipaddress import IPv4Address
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from primaite.game.agent.actions import ConfigureDatabaseClientAction, ConfigureRansomwareScriptAction
|
||||
from primaite.game.agent.actions import (
|
||||
ConfigureDatabaseClientAction,
|
||||
ConfigureDoSBotAction,
|
||||
ConfigureRansomwareScriptAction,
|
||||
)
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient
|
||||
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
|
||||
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
|
||||
from tests.conftest import ControlledAgent
|
||||
|
||||
@@ -156,3 +162,40 @@ class TestConfigureRansomwareScriptAction:
|
||||
agent.store_action(action)
|
||||
with pytest.raises(ValidationError):
|
||||
game.step()
|
||||
|
||||
|
||||
class TestConfigureDoSBot:
|
||||
def test_configure_DoSBot(self, game_and_agent):
|
||||
game, agent = game_and_agent
|
||||
agent: ControlledAgent
|
||||
agent.action_manager.actions["CONFIGURE_DOSBOT"] = ConfigureDoSBotAction(agent.action_manager)
|
||||
|
||||
client_1 = game.simulation.network.get_node_by_hostname("client_1")
|
||||
client_1.software_manager.install(DoSBot)
|
||||
dos_bot: DoSBot = client_1.software_manager.software["DoSBot"]
|
||||
|
||||
action = (
|
||||
"CONFIGURE_DOSBOT",
|
||||
{
|
||||
"node_id": 0,
|
||||
"options": {
|
||||
"target_ip_address": "192.168.1.99",
|
||||
"target_port": "POSTGRES_SERVER",
|
||||
"payload": "HACC",
|
||||
"repeat": False,
|
||||
"port_scan_p_of_success": 0.875,
|
||||
"dos_intensity": 0.75,
|
||||
"max_sessions": 50,
|
||||
},
|
||||
},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
|
||||
assert dos_bot.target_ip_address == IPv4Address("192.168.1.99")
|
||||
assert dos_bot.target_port == Port.POSTGRES_SERVER
|
||||
assert dos_bot.payload == "HACC"
|
||||
assert not dos_bot.repeat
|
||||
assert dos_bot.port_scan_p_of_success == 0.875
|
||||
assert dos_bot.dos_intensity == 0.75
|
||||
assert dos_bot.max_sessions == 50
|
||||
|
||||
Reference in New Issue
Block a user