Merged PR 444: Refactor application install

## Summary
* Remove the ip address parameter from application install
* Make it possible to install any application that exists
* Add new configuration actions for applications
* Add an application registry to match names to application classes

## Test process
* several new tests
* notebooks still running

## Checklist
- [X] PR is linked to a **work item**
- [X] **acceptance criteria** of linked ticket are met
- [X] performed **self-review** of the code
- [X] written **tests** for any new functionality added with this PR
- [ ] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [ ] updated the **change log**
- [X] ran **pre-commit** checks for code style
- [X] attended to any **TO-DOs** left in the code

Related work items: #2438, #2705
This commit is contained in:
Marek Wolan
2024-07-02 15:57:59 +00:00
28 changed files with 732 additions and 181 deletions

View File

@@ -8,7 +8,7 @@
applications/*
More info :py:mod:`primaite.game.game.APPLICATION_TYPES_MAPPING`
More info :py:mod:`primaite.simulator.system.applications.application.Application`
.. include:: list_of_system_applications.rst

View File

@@ -14,9 +14,10 @@ from abc import ABC, abstractmethod
from typing import Dict, List, Literal, Optional, Tuple, TYPE_CHECKING, Union
from gymnasium import spaces
from pydantic import BaseModel, Field, field_validator, ValidationInfo
from pydantic import BaseModel, ConfigDict, Field, field_validator, ValidationInfo
from primaite import getLogger
from primaite.interface.request import RequestFormat
_LOGGER = getLogger(__name__)
@@ -228,7 +229,7 @@ class NodeApplicationInstallAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes}
def form_request(self, node_id: int, application_name: str, ip_address: str) -> List[str]:
def form_request(self, node_id: int, application_name: str) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
@@ -241,10 +242,81 @@ class NodeApplicationInstallAction(AbstractAction):
"application",
"install",
application_name,
ip_address,
]
class ConfigureDatabaseClientAction(AbstractAction):
"""Action which sets config parameters for a database client on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this action."""
model_config = ConfigDict(extra="forbid")
server_ip_address: Optional[str] = None
server_password: Optional[str] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, config: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
ConfigureDatabaseClientAction._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "DatabaseClient", "configure", config]
class ConfigureRansomwareScriptAction(AbstractAction):
"""Action which sets config parameters for a ransomware script on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this option."""
model_config = ConfigDict(extra="forbid")
server_ip_address: Optional[str] = None
server_password: Optional[str] = None
payload: Optional[str] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, config: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
ConfigureRansomwareScriptAction._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "RansomwareScript", "configure", config]
class ConfigureDoSBotAction(AbstractAction):
"""Action which sets config parameters for a DoS bot on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this option."""
model_config = ConfigDict(extra="forbid")
target_ip_address: Optional[str] = None
target_port: Optional[str] = None
payload: Optional[str] = None
repeat: Optional[bool] = None
port_scan_p_of_success: Optional[float] = None
dos_intensity: Optional[float] = None
max_sessions: Optional[int] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, config: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
self._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "DoSBot", "configure", config]
class NodeApplicationRemoveAction(AbstractAction):
"""Action which removes/uninstalls an application."""
@@ -1045,6 +1117,9 @@ class ActionManager:
"NODE_NMAP_PING_SCAN": NodeNMAPPingScanAction,
"NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction,
"NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction,
"CONFIGURE_DATABASE_CLIENT": ConfigureDatabaseClientAction,
"CONFIGURE_RANSOMWARE_SCRIPT": ConfigureRansomwareScriptAction,
"CONFIGURE_DOSBOT": ConfigureDoSBotAction,
}
"""Dictionary which maps action type strings to the corresponding action class."""

View File

@@ -55,7 +55,6 @@ class TAP001(AbstractScriptedAgent):
return "NODE_APPLICATION_INSTALL", {
"node_id": self.starting_node_idx,
"application_name": "RansomwareScript",
"ip_address": self.ip_address,
}
return "NODE_APPLICATION_EXECUTE", {"node_id": self.starting_node_idx, "application_id": 0}

View File

@@ -26,11 +26,14 @@ from primaite.simulator.network.hardware.nodes.network.wireless_router import Wi
from primaite.simulator.network.nmne import set_nmne_config
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.sim_container import Simulation
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import DataManipulationBot
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient # noqa: F401
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import ( # noqa: F401
DataManipulationBot,
)
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot # noqa: F401
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript # noqa: F401
from primaite.simulator.system.applications.web_browser import WebBrowser # noqa: F401
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
@@ -42,15 +45,6 @@ from primaite.simulator.system.services.web_server.web_server import WebServer
_LOGGER = getLogger(__name__)
APPLICATION_TYPES_MAPPING = {
"WebBrowser": WebBrowser,
"DatabaseClient": DatabaseClient,
"DataManipulationBot": DataManipulationBot,
"DoSBot": DoSBot,
"RansomwareScript": RansomwareScript,
}
"""List of available applications that can be installed on nodes in the PrimAITE Simulation."""
SERVICE_TYPES_MAPPING = {
"DNSClient": DNSClient,
"DNSServer": DNSServer,
@@ -319,7 +313,8 @@ class PrimaiteGame:
if "options" in service_cfg:
opt = service_cfg["options"]
new_service.password = opt.get("db_password", None)
new_service.configure_backup(backup_server=IPv4Address(opt.get("backup_server_ip")))
if "backup_server_ip" in opt:
new_service.configure_backup(backup_server=IPv4Address(opt.get("backup_server_ip")))
if service_type == "FTPServer":
if "options" in service_cfg:
opt = service_cfg["options"]
@@ -333,9 +328,9 @@ class PrimaiteGame:
new_application = None
application_type = application_cfg["type"]
if application_type in APPLICATION_TYPES_MAPPING:
new_node.software_manager.install(APPLICATION_TYPES_MAPPING[application_type])
new_application = new_node.software_manager.software[application_type]
if application_type in Application._application_registry:
new_node.software_manager.install(Application._application_registry[application_type])
new_application = new_node.software_manager.software[application_type] # grab the instance
else:
msg = f"Configuration contains an invalid application type: {application_type}"
_LOGGER.error(msg)

View File

@@ -507,8 +507,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now service 1 on node 2 has `health_status = 3`, indicating that the webapp is compromised.\n",
"File 1 in folder 1 on node 3 has `health_status = 2`, indicating that the database file is compromised."
"Now service 1 on HOST1 has `health_status = 3`, indicating that the webapp is compromised.\n",
"File 1 in folder 1 on HOST2 has `health_status = 2`, indicating that the database file is compromised."
]
},
{
@@ -545,9 +545,9 @@
"source": [
"The fixing takes two steps, so the reward hasn't changed yet. Let's do nothing for another timestep, the reward should improve.\n",
"\n",
"The reward will increase slightly as soon as the file finishes restoring. Then, the reward will increase to 1 when both green agents make successful requests.\n",
"The reward will increase slightly as soon as the file finishes restoring. Then, the reward will increase to 0.9 when both green agents make successful requests.\n",
"\n",
"Run the following cell until the green action is `NODE_APPLICATION_EXECUTE` for application 0, then the reward should become 1. If you run it enough times, another red attack will happen and the reward will drop again."
"Run the following cell until the green action is `NODE_APPLICATION_EXECUTE` for application 0, then the reward should increase. If you run it enough times, another red attack will happen and the reward will drop again."
]
},
{
@@ -708,7 +708,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.10.12"
}
},
"nbformat": 4,

View File

@@ -171,7 +171,7 @@
"from primaite.simulator.file_system.file_system import FileSystem\n",
"\n",
"# no applications exist yet so we will create our own.\n",
"class MSPaint(Application):\n",
"class MSPaint(Application, identifier=\"MSPaint\"):\n",
" def describe_state(self):\n",
" return super().describe_state()"
]

View File

@@ -196,7 +196,7 @@ class SimComponent(BaseModel):
..code::python
class WebBrowser(Application):
class WebBrowser(Application, identifier="WebBrowser"):
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager() # all requests generic to any Application get initialised
rm.add_request(...) # initialise any requests specific to the web browser

View File

@@ -6,7 +6,7 @@ import secrets
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, Dict, Optional, Type, TypeVar, Union
from typing import Any, Dict, Optional, TypeVar, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field
@@ -884,6 +884,61 @@ class Node(SimComponent):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
def _install_application(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Allows agents to install applications to the node.
:param request: list containing the application name as the only element
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: Request response with a success code if the application was installed.
:rtype: RequestResponse
"""
application_name = request[0]
if self.software_manager.software.get(application_name):
self.sys_log.warning(f"Can't install {application_name}. It's already installed.")
return RequestResponse.from_bool(False)
application_class = Application._application_registry[application_name]
self.software_manager.install(application_class)
application_instance = self.software_manager.software.get(application_name)
self.applications[application_instance.uuid] = application_instance
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
self._application_request_manager.add_request(
application_name, RequestType(func=application_instance._request_manager)
)
application_instance.install()
if application_name in self.software_manager.software:
return RequestResponse.from_bool(True)
else:
return RequestResponse.from_bool(False)
def _uninstall_application(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Uninstall and completely remove application from this node.
This method is useful for allowing agents to take this action.
:param request: list containing the application name as the only element
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: Request response with a success code if the application was uninstalled.
:rtype: RequestResponse
"""
application_name = request[0]
if application_name not in self.software_manager.software:
self.sys_log.warning(f"Can't uninstall {application_name}. It's not installed.")
return RequestResponse.from_bool(False)
application_instance = self.software_manager.software.get(application_name)
self.software_manager.uninstall(application_instance.name)
if application_instance.name not in self.software_manager.software:
return RequestResponse.from_bool(True)
else:
return RequestResponse.from_bool(False)
_node_is_on = Node._NodeIsOnValidator(node=self)
rm = super()._init_request_manager()
@@ -940,25 +995,8 @@ class Node(SimComponent):
name="application", request_type=RequestType(func=self._application_manager)
)
self._application_manager.add_request(
name="install",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.application_install_action(
application=self._read_application_type(request[0]), ip_address=request[1]
)
)
),
)
self._application_manager.add_request(
name="uninstall",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.application_uninstall_action(application=self._read_application_type(request[0]))
)
),
)
self._application_manager.add_request(name="install", request_type=RequestType(func=_install_application))
self._application_manager.add_request(name="uninstall", request_type=RequestType(func=_uninstall_application))
return rm
@@ -966,29 +1004,6 @@ class Node(SimComponent):
"""Install System Software - software that is usually provided with the OS."""
pass
def _read_application_type(self, application_class_str: str) -> Type[IOSoftwareClass]:
"""Wrapper that converts the string from the request manager into the appropriate class for the application."""
if application_class_str == "DoSBot":
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
return DoSBot
elif application_class_str == "DataManipulationBot":
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
DataManipulationBot,
)
return DataManipulationBot
elif application_class_str == "WebBrowser":
from primaite.simulator.system.applications.web_browser import WebBrowser
return WebBrowser
elif application_class_str == "RansomwareScript":
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
return RansomwareScript
else:
return 0
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
@@ -1417,76 +1432,6 @@ class Node(SimComponent):
self.sys_log.info(f"Uninstalled application {application.name}")
self._application_request_manager.remove_request(application.name)
def application_install_action(self, application: Application, ip_address: Optional[str] = None) -> bool:
"""
Install an application on this node and configure it.
This method is useful for allowing agents to take this action.
:param application: Application object that has not been installed on any node yet.
:type application: Application
:param ip_address: IP address used to configure the application
(target IP for the DoSBot or server IP for the DataManipulationBot)
:type ip_address: str
:return: True if the application is installed successfully, otherwise False.
"""
if application in self:
_LOGGER.warning(
f"Can't add application {application.__name__}" + f"to node {self.hostname}. It's already installed."
)
return True
self.software_manager.install(application)
application_instance = self.software_manager.software.get(str(application.__name__))
self.applications[application_instance.uuid] = application_instance
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
self._application_request_manager.add_request(
application_instance.name, RequestType(func=application_instance._request_manager)
)
# Configure application if additional parameters are given
if ip_address:
if application_instance.name == "DoSBot":
application_instance.configure(target_ip_address=IPv4Address(ip_address))
elif application_instance.name == "DataManipulationBot":
application_instance.configure(server_ip_address=IPv4Address(ip_address))
elif application_instance.name == "RansomwareScript":
application_instance.configure(server_ip_address=IPv4Address(ip_address))
else:
pass
application_instance.install()
if application_instance.name in self.software_manager.software:
return True
else:
return False
def application_uninstall_action(self, application: Application) -> bool:
"""
Uninstall and completely remove application from this node.
This method is useful for allowing agents to take this action.
:param application: Application object that is currently associated with this node.
:type application: Application
:return: True if the application is uninstalled successfully, otherwise False.
"""
if application.__name__ not in self.software_manager.software:
_LOGGER.warning(
f"Can't remove application {application.__name__}" + f"from node {self.hostname}. It's not installed."
)
return True
application_instance = self.software_manager.software.get(
str(application.__name__)
) # This works because we can't have two applications with the same name on the same node
# self.uninstall_application(application_instance)
self.software_manager.uninstall(application_instance.name)
if application_instance.name not in self.software_manager.software:
return True
else:
return False
def _shut_down_actions(self):
"""Actions to perform when the node is shut down."""
# Turn off all the services in the node

View File

@@ -1,7 +1,7 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, Set
from typing import Any, ClassVar, Dict, Optional, Set, Type
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
@@ -39,6 +39,22 @@ class Application(IOSoftware):
install_countdown: Optional[int] = None
"The countdown to the end of the installation process. None if not currently installing"
_application_registry: ClassVar[Dict[str, Type["Application"]]] = {}
"""Registry of application types. Automatically populated when subclasses are defined."""
def __init_subclass__(cls, identifier: str, **kwargs: Any) -> None:
"""
Register an application type.
:param identifier: Uniquely specifies an application class by name. Used for finding items by config.
:type identifier: str
:raises ValueError: When attempting to register an application with a name that is already allocated.
"""
super().__init_subclass__(**kwargs)
if identifier in cls._application_registry:
raise ValueError(f"Tried to define new application {identifier}, but this name is already reserved.")
cls._application_registry[identifier] = cls
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@@ -8,13 +8,14 @@ from uuid import uuid4
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.utils.validators import IPV4Address
class DatabaseClientConnection(BaseModel):
@@ -54,7 +55,7 @@ class DatabaseClientConnection(BaseModel):
self.client._disconnect(self.connection_id) # noqa
class DatabaseClient(Application):
class DatabaseClient(Application, identifier="DatabaseClient"):
"""
A DatabaseClient application.
@@ -96,6 +97,14 @@ class DatabaseClient(Application):
"""
rm = super()._init_request_manager()
rm.add_request("execute", RequestType(func=lambda request, context: RequestResponse.from_bool(self.execute())))
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
ip, pw = request[-1].get("server_ip_address"), request[-1].get("server_password")
ip = None if ip is None else IPV4Address(ip)
success = self.configure(server_ip_address=ip, server_password=pw)
return RequestResponse.from_bool(success)
rm.add_request("configure", RequestType(func=_configure))
return rm
def execute(self) -> bool:
@@ -141,16 +150,17 @@ class DatabaseClient(Application):
table.add_row([connection_id, connection.is_active])
print(table.get_string(sortby="Connection ID"))
def configure(self, server_ip_address: IPv4Address, server_password: Optional[str] = None):
def configure(self, server_ip_address: Optional[IPv4Address] = None, server_password: Optional[str] = None) -> bool:
"""
Configure the DatabaseClient to communicate with a DatabaseService.
:param server_ip_address: The IP address of the Node the DatabaseService is on.
:param server_password: The password on the DatabaseService.
"""
self.server_ip_address = server_ip_address
self.server_password = server_password
self.server_ip_address = server_ip_address or self.server_ip_address
self.server_password = server_password or self.server_password
self.sys_log.info(f"{self.name}: Configured the {self.name} with {server_ip_address=}, {server_password=}.")
return True
def connect(self) -> bool:
"""Connect the native client connection."""

View File

@@ -44,7 +44,7 @@ class PortScanPayload(SimComponent):
return state
class NMAP(Application):
class NMAP(Application, identifier="NMAP"):
"""
A class representing the NMAP application for network scanning.

View File

@@ -37,7 +37,7 @@ class DataManipulationAttackStage(IntEnum):
"Signifies that the attack has failed."
class DataManipulationBot(Application):
class DataManipulationBot(Application, identifier="DataManipulationBot"):
"""A bot that simulates a script which performs a SQL injection attack."""
payload: Optional[str] = None

View File

@@ -1,11 +1,11 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from enum import IntEnum
from ipaddress import IPv4Address
from typing import Optional
from typing import Dict, Optional
from primaite import getLogger
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.database_client import DatabaseClient
@@ -29,7 +29,7 @@ class DoSAttackStage(IntEnum):
"Attack is completed."
class DoSBot(DatabaseClient):
class DoSBot(DatabaseClient, identifier="DoSBot"):
"""A bot that simulates a Denial of Service attack."""
target_ip_address: Optional[IPv4Address] = None
@@ -71,6 +71,24 @@ class DoSBot(DatabaseClient):
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.run())),
)
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Configure the DoSBot.
:param request: List with one element that is a dict of options to pass to the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: Request Response object with a success code determining if the configuration was successful.
:rtype: RequestResponse
"""
if "target_ip_address" in request[-1]:
request[-1]["target_ip_address"] = IPv4Address(request[-1]["target_ip_address"])
if "target_port" in request[-1]:
request[-1]["target_port"] = Port[request[-1]["target_port"]]
return RequestResponse.from_bool(self.configure(**request[-1]))
rm.add_request("configure", request_type=RequestType(func=_configure))
return rm
def configure(
@@ -82,7 +100,7 @@ class DoSBot(DatabaseClient):
port_scan_p_of_success: float = 0.1,
dos_intensity: float = 1.0,
max_sessions: int = 1000,
):
) -> bool:
"""
Configure the Denial of Service bot.
@@ -90,10 +108,12 @@ class DoSBot(DatabaseClient):
:param: target_port: The port of the target service. Optional - Default is `Port.HTTP`
:param: payload: The payload the DoS Bot will throw at the target service. Optional - Default is `None`
:param: repeat: If True, the bot will maintain the attack. Optional - Default is `True`
:param: port_scan_p_of_success: The chance of the port scan being sucessful. Optional - Default is 0.1 (10%)
:param: port_scan_p_of_success: The chance of the port scan being successful. Optional - Default is 0.1 (10%)
:param: dos_intensity: The intensity of the DoS attack.
Multiplied with the application's max session - Default is 1.0
:param: max_sessions: The maximum number of sessions the DoS bot will attack with. Optional - Default is 1000
:return: Always returns True
:rtype: bool
"""
self.target_ip_address = target_ip_address
self.target_port = target_port
@@ -106,6 +126,7 @@ class DoSBot(DatabaseClient):
f"{self.name}: Configured the {self.name} with {target_ip_address=}, {target_port=}, {payload=}, "
f"{repeat=}, {port_scan_p_of_success=}, {dos_intensity=}, {max_sessions=}."
)
return True
def run(self) -> bool:
"""Run the Denial of Service Bot."""
@@ -117,6 +138,9 @@ class DoSBot(DatabaseClient):
The main application loop for the Denial of Service bot.
The loop goes through the stages of a DoS attack.
:return: True if the application loop could be executed, False otherwise.
:rtype: bool
"""
if not self._can_perform_action():
return False
@@ -126,7 +150,7 @@ class DoSBot(DatabaseClient):
self.sys_log.warning(
f"{self.name} is not properly configured. {self.target_ip_address=}, {self.target_port=}"
)
return True
return False
self.clear_connections()
self._perform_port_scan(p_of_success=self.port_scan_p_of_success)

View File

@@ -2,7 +2,7 @@
from ipaddress import IPv4Address
from typing import Dict, Optional
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@@ -10,7 +10,7 @@ from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
class RansomwareScript(Application):
class RansomwareScript(Application, identifier="RansomwareScript"):
"""Ransomware Kill Chain - Designed to be used by the TAP001 Agent on the example layout Network.
:ivar payload: The attack stage query payload. (Default ENCRYPT)
@@ -62,6 +62,25 @@ class RansomwareScript(Application):
name="execute",
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.attack())),
)
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Request for configuring the target database and payload.
:param request: Request with one element contianing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: RequestResponse object with a success code reflecting whether the configuration could be applied.
:rtype: RequestResponse
"""
ip = request[-1].get("server_ip_address")
ip = None if ip is None else IPv4Address(ip)
pw = request[-1].get("server_password")
payload = request[-1].get("payload")
return RequestResponse.from_bool(self.configure(ip, pw, payload))
rm.add_request("configure", request_type=RequestType(func=_configure))
return rm
def run(self) -> bool:
@@ -91,7 +110,7 @@ class RansomwareScript(Application):
server_ip_address: IPv4Address,
server_password: Optional[str] = None,
payload: Optional[str] = None,
):
) -> bool:
"""
Configure the Ransomware Script to communicate with a DatabaseService.
@@ -108,6 +127,7 @@ class RansomwareScript(Application):
self.sys_log.info(
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}."
)
return True
def attack(self) -> bool:
"""Perform the attack steps after opening the application."""

View File

@@ -23,7 +23,7 @@ from primaite.simulator.system.services.dns.dns_client import DNSClient
_LOGGER = getLogger(__name__)
class WebBrowser(Application):
class WebBrowser(Application, identifier="WebBrowser"):
"""
Represents a web browser in the simulation environment.

View File

@@ -0,0 +1,142 @@
io_settings:
save_step_metadata: false
save_pcap_logs: false
save_sys_logs: false
save_agent_actions: false
game:
max_episode_length: 256
ports:
- ARP
- DNS
protocols:
- ICMP
- TCP
agents:
- ref: agent_1
team: BLUE
type: ProxyAgent
observation_space: null
action_space:
action_list:
- type: DONOTHING
- type: NODE_APPLICATION_INSTALL
- type: CONFIGURE_DATABASE_CLIENT
- type: CONFIGURE_DOSBOT
- type: CONFIGURE_RANSOMWARE_SCRIPT
- type: NODE_APPLICATION_REMOVE
action_map:
0:
action: DONOTHING
options: {}
1:
action: NODE_APPLICATION_INSTALL
options:
node_id: 0
application_name: DatabaseClient
2:
action: NODE_APPLICATION_INSTALL
options:
node_id: 1
application_name: RansomwareScript
3:
action: NODE_APPLICATION_INSTALL
options:
node_id: 2
application_name: DoSBot
4:
action: CONFIGURE_DATABASE_CLIENT
options:
node_id: 0
config:
server_ip_address: 10.0.0.5
5:
action: CONFIGURE_DATABASE_CLIENT
options:
node_id: 0
config:
server_password: correct_password
6:
action: CONFIGURE_RANSOMWARE_SCRIPT
options:
node_id: 1
config:
server_ip_address: 10.0.0.5
server_password: correct_password
payload: ENCRYPT
7:
action: CONFIGURE_DOSBOT
options:
node_id: 2
config:
target_ip_address: 10.0.0.5
target_port: POSTGRES_SERVER
payload: DELETE
repeat: true
port_scan_p_of_success: 1.0
dos_intensity: 1.0
max_sessions: 1000
8:
action: NODE_APPLICATION_INSTALL
options:
node_id: 1
application_name: DatabaseClient
options:
nodes:
- node_name: client_1
- node_name: client_2
- node_name: client_3
ip_list: []
reward_function:
reward_components:
- type: DUMMY
simulation:
network:
nodes:
- type: computer
hostname: client_1
ip_address: 10.0.0.2
subnet_mask: 255.255.255.0
default_gateway: 10.0.0.1
- type: computer
hostname: client_2
ip_address: 10.0.0.3
subnet_mask: 255.255.255.0
default_gateway: 10.0.0.1
- type: computer
hostname: client_3
ip_address: 10.0.0.4
subnet_mask: 255.255.255.0
default_gateway: 10.0.0.1
- type: switch
hostname: switch_1
num_ports: 8
- type: server
hostname: server_1
ip_address: 10.0.0.5
subnet_mask: 255.255.255.0
default_gateway: 10.0.0.1
services:
- type: DatabaseService
options:
db_password: correct_password
links:
- endpoint_a_hostname: client_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 1
- endpoint_a_hostname: client_2
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 2
- endpoint_a_hostname: client_3
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 3
- endpoint_a_hostname: server_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 8

View File

@@ -260,6 +260,7 @@ agents:
- type: NODE_APPLICATION_INSTALL
- type: NODE_APPLICATION_REMOVE
- type: NODE_APPLICATION_EXECUTE
- type: CONFIGURE_DOSBOT
action_map:
0:
@@ -683,7 +684,6 @@ agents:
options:
node_id: 0
application_name: DoSBot
ip_address: 192.168.1.14
79:
action: NODE_APPLICATION_REMOVE
options:
@@ -699,6 +699,14 @@ agents:
options:
node_id: 0
application_id: 0
82:
action: CONFIGURE_DOSBOT
options:
node_id: 0
config:
target_ip_address: 192.168.1.14
target_port: POSTGRES_SERVER

View File

@@ -51,11 +51,11 @@ class TestService(Service):
pass
class TestApplication(Application):
class DummyApplication(Application, identifier="DummyApplication"):
"""Test Application class"""
def __init__(self, **kwargs):
kwargs["name"] = "TestApplication"
kwargs["name"] = "DummyApplication"
kwargs["port"] = Port.HTTP
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
@@ -85,15 +85,15 @@ def service_class():
@pytest.fixture(scope="function")
def application(file_system) -> TestApplication:
return TestApplication(
name="TestApplication", port=Port.ARP, file_system=file_system, sys_log=SysLog(hostname="test_application")
def application(file_system) -> DummyApplication:
return DummyApplication(
name="DummyApplication", port=Port.ARP, file_system=file_system, sys_log=SysLog(hostname="dummy_application")
)
@pytest.fixture(scope="function")
def application_class():
return TestApplication
return DummyApplication
@pytest.fixture(scope="function")

View File

@@ -69,6 +69,7 @@ def test_application_install_uninstall_on_uc2():
env.step(0)
# Test we can now execute the DoSBot app
env.step(82) # configure dos bot with ip address and port
_, _, _, _, info = env.step(81)
assert info["agent_actions"]["defender"].response.status == "success"

View File

@@ -9,9 +9,10 @@ from primaite.config.load import data_manipulation_config_path
from primaite.game.agent.interface import ProxyAgent
from primaite.game.agent.scripted_agents.data_manipulation_bot import DataManipulationAgent
from primaite.game.agent.scripted_agents.probabilistic_agent import ProbabilisticAgent
from primaite.game.game import APPLICATION_TYPES_MAPPING, PrimaiteGame, SERVICE_TYPES_MAPPING
from primaite.game.game import PrimaiteGame, SERVICE_TYPES_MAPPING
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import DataManipulationBot
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
@@ -85,7 +86,7 @@ def test_node_software_install():
assert client_2.software_manager.software.get(software.__name__) is not None
# check that applications have been installed on client 1
for applications in APPLICATION_TYPES_MAPPING:
for applications in Application._application_registry:
assert client_1.software_manager.software.get(applications) is not None
# check that services have been installed on client 1

View File

@@ -0,0 +1 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK

View File

@@ -0,0 +1,292 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
import pytest
from pydantic import ValidationError
from primaite.game.agent.actions import (
ConfigureDatabaseClientAction,
ConfigureDoSBotAction,
ConfigureRansomwareScriptAction,
)
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
from tests import TEST_ASSETS_ROOT
from tests.conftest import ControlledAgent
APP_CONFIG_YAML = TEST_ASSETS_ROOT / "configs/install_and_configure_apps.yaml"
class TestConfigureDatabaseAction:
def test_configure_ip_password(self, game_and_agent):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_DATABASE_CLIENT"] = ConfigureDatabaseClientAction(agent.action_manager)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(DatabaseClient)
db_client: DatabaseClient = client_1.software_manager.software["DatabaseClient"]
action = (
"CONFIGURE_DATABASE_CLIENT",
{
"node_id": 0,
"config": {
"server_ip_address": "192.168.1.99",
"server_password": "admin123",
},
},
)
agent.store_action(action)
game.step()
assert db_client.server_ip_address == IPv4Address("192.168.1.99")
assert db_client.server_password == "admin123"
def test_configure_ip(self, game_and_agent):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_DATABASE_CLIENT"] = ConfigureDatabaseClientAction(agent.action_manager)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(DatabaseClient)
db_client: DatabaseClient = client_1.software_manager.software["DatabaseClient"]
action = (
"CONFIGURE_DATABASE_CLIENT",
{
"node_id": 0,
"config": {
"server_ip_address": "192.168.1.99",
},
},
)
agent.store_action(action)
game.step()
assert db_client.server_ip_address == IPv4Address("192.168.1.99")
assert db_client.server_password is None
def test_configure_password(self, game_and_agent):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_DATABASE_CLIENT"] = ConfigureDatabaseClientAction(agent.action_manager)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(DatabaseClient)
db_client: DatabaseClient = client_1.software_manager.software["DatabaseClient"]
old_ip = db_client.server_ip_address
action = (
"CONFIGURE_DATABASE_CLIENT",
{
"node_id": 0,
"config": {
"server_password": "admin123",
},
},
)
agent.store_action(action)
game.step()
assert db_client.server_ip_address == old_ip
assert db_client.server_password is "admin123"
class TestConfigureRansomwareScriptAction:
@pytest.mark.parametrize(
"config",
[
{},
{"server_ip_address": "181.181.181.181"},
{"server_password": "admin123"},
{"payload": "ENCRYPT"},
{
"server_ip_address": "181.181.181.181",
"server_password": "admin123",
"payload": "ENCRYPT",
},
],
)
def test_configure_ip_password(self, game_and_agent, config):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_RANSOMWARE_SCRIPT"] = ConfigureRansomwareScriptAction(
agent.action_manager
)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(RansomwareScript)
ransomware_script: RansomwareScript = client_1.software_manager.software["RansomwareScript"]
old_ip = ransomware_script.server_ip_address
old_pw = ransomware_script.server_password
old_payload = ransomware_script.payload
action = (
"CONFIGURE_RANSOMWARE_SCRIPT",
{"node_id": 0, "config": config},
)
agent.store_action(action)
game.step()
expected_ip = old_ip if "server_ip_address" not in config else IPv4Address(config["server_ip_address"])
expected_pw = old_pw if "server_password" not in config else config["server_password"]
expected_payload = old_payload if "payload" not in config else config["payload"]
assert ransomware_script.server_ip_address == expected_ip
assert ransomware_script.server_password == expected_pw
assert ransomware_script.payload == expected_payload
def test_invalid_config(self, game_and_agent):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_RANSOMWARE_SCRIPT"] = ConfigureRansomwareScriptAction(
agent.action_manager
)
# make sure there is a database client on this node
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(RansomwareScript)
ransomware_script: RansomwareScript = client_1.software_manager.software["RansomwareScript"]
action = (
"CONFIGURE_RANSOMWARE_SCRIPT",
{
"node_id": 0,
"config": {"server_password": "admin123", "bad_option": 70},
},
)
agent.store_action(action)
with pytest.raises(ValidationError):
game.step()
class TestConfigureDoSBot:
def test_configure_DoSBot(self, game_and_agent):
game, agent = game_and_agent
agent: ControlledAgent
agent.action_manager.actions["CONFIGURE_DOSBOT"] = ConfigureDoSBotAction(agent.action_manager)
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.software_manager.install(DoSBot)
dos_bot: DoSBot = client_1.software_manager.software["DoSBot"]
action = (
"CONFIGURE_DOSBOT",
{
"node_id": 0,
"config": {
"target_ip_address": "192.168.1.99",
"target_port": "POSTGRES_SERVER",
"payload": "HACC",
"repeat": False,
"port_scan_p_of_success": 0.875,
"dos_intensity": 0.75,
"max_sessions": 50,
},
},
)
agent.store_action(action)
game.step()
assert dos_bot.target_ip_address == IPv4Address("192.168.1.99")
assert dos_bot.target_port == Port.POSTGRES_SERVER
assert dos_bot.payload == "HACC"
assert not dos_bot.repeat
assert dos_bot.port_scan_p_of_success == 0.875
assert dos_bot.dos_intensity == 0.75
assert dos_bot.max_sessions == 50
class TestConfigureYAML:
def test_configure_db_client(self):
env = PrimaiteGymEnv(env_config=APP_CONFIG_YAML)
# make sure there's no db client on the node yet
client_1 = env.game.simulation.network.get_node_by_hostname("client_1")
assert client_1.software_manager.software.get("DatabaseClient") is None
# take the install action, check that the db gets installed, step to get it to finish installing
env.step(1)
db_client: DatabaseClient = client_1.software_manager.software.get("DatabaseClient")
assert isinstance(db_client, DatabaseClient)
assert db_client.operating_state == ApplicationOperatingState.INSTALLING
env.step(0)
env.step(0)
env.step(0)
env.step(0)
# configure the ip address and check that it changes, but password doesn't change
assert db_client.server_ip_address is None
assert db_client.server_password is None
env.step(4)
assert db_client.server_ip_address == IPv4Address("10.0.0.5")
assert db_client.server_password is None
# configure the password and check that it changes, make sure this lets us connect to the db
assert not db_client.connect()
env.step(5)
assert db_client.server_password == "correct_password"
assert db_client.connect()
def test_configure_ransomware_script(self):
env = PrimaiteGymEnv(env_config=APP_CONFIG_YAML)
client_2 = env.game.simulation.network.get_node_by_hostname("client_2")
assert client_2.software_manager.software.get("RansomwareScript") is None
# install ransomware script
env.step(2)
ransom = client_2.software_manager.software.get("RansomwareScript")
assert isinstance(ransom, RansomwareScript)
assert ransom.operating_state == ApplicationOperatingState.INSTALLING
env.step(0)
env.step(0)
env.step(0)
env.step(0)
# make sure it's not working yet because it's not configured and there's no db client
assert not ransom.attack()
env.step(8) # install db client on the same node
env.step(0)
env.step(0)
env.step(0)
env.step(0) # let it finish installing
assert not ransom.attack()
# finally, configure the ransomware script with ip and password
env.step(6)
assert ransom.attack()
db_server = env.game.simulation.network.get_node_by_hostname("server_1")
db_service: DatabaseService = db_server.software_manager.software.get("DatabaseService")
assert db_service.db_file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_configure_dos_bot(self):
env = PrimaiteGymEnv(env_config=APP_CONFIG_YAML)
client_3 = env.game.simulation.network.get_node_by_hostname("client_3")
assert client_3.software_manager.software.get("DoSBot") is None
# install DoSBot
env.step(3)
bot = client_3.software_manager.software.get("DoSBot")
assert isinstance(bot, DoSBot)
assert bot.operating_state == ApplicationOperatingState.INSTALLING
env.step(0)
env.step(0)
env.step(0)
env.step(0)
# make sure dos bot doesn't work before being configured
assert not bot.run()
env.step(7)
assert bot.run()

View File

@@ -557,7 +557,7 @@ def test_node_application_install_and_uninstall_integration(game_and_agent: Tupl
assert client_1.software_manager.software.get("DoSBot") is None
action = ("NODE_APPLICATION_INSTALL", {"node_id": 0, "application_name": "DoSBot", "ip_address": "192.168.1.14"})
action = ("NODE_APPLICATION_INSTALL", {"node_id": 0, "application_name": "DoSBot"})
agent.store_action(action)
game.step()

View File

@@ -41,7 +41,7 @@ class BroadcastService(Service):
super().send(payload="broadcast", dest_ip_address=ip_network, dest_port=Port.HTTP, ip_protocol=self.protocol)
class BroadcastClient(Application):
class BroadcastClient(Application, identifier="BroadcastClient"):
"""A client application to receive broadcast and unicast messages."""
payloads_received: List = []

View File

@@ -3,9 +3,9 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.networks import multi_lan_internet_network_example
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from src.primaite.simulator.system.applications.web_browser import WebBrowser
def test_all_with_configured_dns_server_ip_can_resolve_url():

View File

@@ -21,7 +21,7 @@ def populated_node(application_class) -> Tuple[Application, Computer]:
computer.power_on()
computer.software_manager.install(application_class)
app = computer.software_manager.software.get("TestApplication")
app = computer.software_manager.software.get("DummyApplication")
app.run()
return app, computer
@@ -39,7 +39,7 @@ def test_application_on_offline_node(application_class):
)
computer.software_manager.install(application_class)
app: Application = computer.software_manager.software.get("TestApplication")
app: Application = computer.software_manager.software.get("DummyApplication")
computer.power_off()

View File

@@ -13,7 +13,7 @@ from primaite.simulator.network.hardware.node_operating_state import NodeOperati
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from tests.conftest import TestApplication, TestService
from tests.conftest import DummyApplication, TestService
def test_successful_node_file_system_creation_request(example_network):
@@ -47,14 +47,14 @@ def test_successful_application_requests(example_network):
net = example_network
client_1 = net.get_node_by_hostname("client_1")
client_1.software_manager.install(TestApplication)
client_1.software_manager.software.get("TestApplication").run()
client_1.software_manager.install(DummyApplication)
client_1.software_manager.software.get("DummyApplication").run()
resp_1 = net.apply_request(["node", "client_1", "application", "TestApplication", "scan"])
resp_1 = net.apply_request(["node", "client_1", "application", "DummyApplication", "scan"])
assert resp_1 == RequestResponse(status="success", data={})
resp_2 = net.apply_request(["node", "client_1", "application", "TestApplication", "fix"])
resp_2 = net.apply_request(["node", "client_1", "application", "DummyApplication", "fix"])
assert resp_2 == RequestResponse(status="success", data={})
resp_3 = net.apply_request(["node", "client_1", "application", "TestApplication", "compromise"])
resp_3 = net.apply_request(["node", "client_1", "application", "DummyApplication", "compromise"])
assert resp_3 == RequestResponse(status="success", data={})

View File

@@ -0,0 +1,22 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import pytest
from primaite.simulator.system.applications.application import Application
def test_adding_to_app_registry():
class temp_application(Application, identifier="temp_app"):
pass
assert Application._application_registry["temp_app"] is temp_application
with pytest.raises(ValueError):
class another_application(Application, identifier="temp_app"):
pass
# This is kinda evil...
# Because pytest doesn't reimport classes from modules, registering this temporary test application will change the
# state of the Application registry for all subsequently run tests. So, we have to delete and unregister the class.
del temp_application
Application._application_registry.pop("temp_app")