#2700 add ransomware configure action

This commit is contained in:
Marek Wolan
2024-07-01 14:41:41 +01:00
parent c34cb6d7ce
commit ab73ac20e8
5 changed files with 113 additions and 5 deletions

View File

@@ -14,7 +14,7 @@ from abc import ABC, abstractmethod
from typing import Dict, List, Literal, Optional, Tuple, TYPE_CHECKING, Union
from gymnasium import spaces
from pydantic import BaseModel, Field, field_validator, ValidationInfo
from pydantic import BaseModel, ConfigDict, Field, field_validator, ValidationInfo
from primaite import getLogger
from primaite.interface.request import RequestFormat
@@ -252,6 +252,7 @@ class ConfigureDatabaseClientAction(AbstractAction):
class _Opts(BaseModel):
"""Schema for options that can be passed to this action."""
model_config = ConfigDict(extra="forbid")
server_ip_address: Optional[str] = None
server_password: Optional[str] = None
@@ -267,6 +268,29 @@ class ConfigureDatabaseClientAction(AbstractAction):
return ["network", "node", node_name, "application", "DatabaseClient", "configure", options]
class ConfigureRansomwareScriptAction(AbstractAction):
"""Action which sets config parameters for a ransomware script on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this option."""
model_config = ConfigDict(extra="forbid")
server_ip_address: Optional[str] = None
server_password: Optional[str] = None
payload: Optional[str] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, options: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
ConfigureRansomwareScriptAction._Opts.model_validate(options) # check that options adhere to schema
return ["network", "node", node_name, "application", "RansomwareScript", "configure", options]
class NodeApplicationRemoveAction(AbstractAction):
"""Action which removes/uninstalls an application."""
@@ -1068,6 +1092,7 @@ class ActionManager:
"NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction,
"NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction,
"CONFIGURE_DATABASE_CLIENT": ConfigureDatabaseClientAction,
"CONFIGURE_RANSOMWARE_SCRIPT": ConfigureRansomwareScriptAction,
}
"""Dictionary which maps action type strings to the corresponding action class."""

View File

@@ -104,7 +104,7 @@ class DatabaseClient(Application):
success = self.configure(server_ip_address=ip, server_password=pw)
return RequestResponse.from_bool(success)
rm.add_request("configure", RequestType(func=lambda request, context: _configure(request, context)))
rm.add_request("configure", RequestType(func=_configure))
return rm
def execute(self) -> bool:

View File

@@ -2,7 +2,7 @@
from ipaddress import IPv4Address
from typing import Dict, Optional
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@@ -62,6 +62,15 @@ class RansomwareScript(Application):
name="execute",
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.attack())),
)
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
ip = request[-1].get("server_ip_address")
ip = None if ip is None else IPv4Address(ip)
pw = request[-1].get("server_password")
payload = request[-1].get("payload")
return RequestResponse.from_bool(self.configure(ip, pw, payload))
rm.add_request("configure", request_type=RequestType(func=_configure))
return rm
def run(self) -> bool:
@@ -91,7 +100,7 @@ class RansomwareScript(Application):
server_ip_address: IPv4Address,
server_password: Optional[str] = None,
payload: Optional[str] = None,
):
) -> bool:
"""
Configure the Ransomware Script to communicate with a DatabaseService.
@@ -108,6 +117,7 @@ class RansomwareScript(Application):
self.sys_log.info(
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}."
)
return True
def attack(self) -> bool:
"""Perform the attack steps after opening the application."""

View File

@@ -1,8 +1,12 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
from primaite.game.agent.actions import ConfigureDatabaseClientAction
import pytest
from pydantic import ValidationError
from primaite.game.agent.actions import ConfigureDatabaseClientAction, ConfigureRansomwareScriptAction
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from tests.conftest import ControlledAgent
@@ -83,3 +87,72 @@ class TestConfigureDatabaseAction:
assert db_client.server_ip_address == old_ip
assert db_client.server_password is "admin123"
class TestConfigureRansomwareScriptAction:
@pytest.mark.parametrize(
"options",
[
{},
{"server_ip_address": "181.181.181.181"},
{"server_password": "admin123"},
{"payload": "ENCRYPT"},
{
"server_ip_address": "181.181.181.181",
"server_password": "admin123",
"payload": "ENCRYPT",
},
],
)
def test_configure_ip_password(self, game_and_agent, options):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_RANSOMWARE_SCRIPT"] = ConfigureRansomwareScriptAction(
agent.action_manager
)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(RansomwareScript)
ransomware_script: RansomwareScript = client_1.software_manager.software["RansomwareScript"]
old_ip = ransomware_script.server_ip_address
old_pw = ransomware_script.server_password
old_payload = ransomware_script.payload
action = (
"CONFIGURE_RANSOMWARE_SCRIPT",
{"node_id": 0, "options": options},
)
agent.store_action(action)
game.step()
expected_ip = old_ip if "server_ip_address" not in options else IPv4Address(options["server_ip_address"])
expected_pw = old_pw if "server_password" not in options else options["server_password"]
expected_payload = old_payload if "payload" not in options else options["payload"]
assert ransomware_script.server_ip_address == expected_ip
assert ransomware_script.server_password == expected_pw
assert ransomware_script.payload == expected_payload
def test_invalid_options(self, game_and_agent):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_RANSOMWARE_SCRIPT"] = ConfigureRansomwareScriptAction(
agent.action_manager
)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(RansomwareScript)
ransomware_script: RansomwareScript = client_1.software_manager.software["RansomwareScript"]
action = (
"CONFIGURE_RANSOMWARE_SCRIPT",
{
"node_id": 0,
"options": {"server_password": "admin123", "bad_option": 70},
},
)
agent.store_action(action)
with pytest.raises(ValidationError):
game.step()