Merge remote-tracking branch 'origin/dev' into feature/2725-add-software-fix-duration-config

This commit is contained in:
Czar Echavez
2024-07-03 10:37:02 +01:00
28 changed files with 732 additions and 181 deletions

View File

@@ -14,9 +14,10 @@ from abc import ABC, abstractmethod
from typing import Dict, List, Literal, Optional, Tuple, TYPE_CHECKING, Union
from gymnasium import spaces
from pydantic import BaseModel, Field, field_validator, ValidationInfo
from pydantic import BaseModel, ConfigDict, Field, field_validator, ValidationInfo
from primaite import getLogger
from primaite.interface.request import RequestFormat
_LOGGER = getLogger(__name__)
@@ -228,7 +229,7 @@ class NodeApplicationInstallAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes}
def form_request(self, node_id: int, application_name: str, ip_address: str) -> List[str]:
def form_request(self, node_id: int, application_name: str) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
@@ -241,10 +242,81 @@ class NodeApplicationInstallAction(AbstractAction):
"application",
"install",
application_name,
ip_address,
]
class ConfigureDatabaseClientAction(AbstractAction):
"""Action which sets config parameters for a database client on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this action."""
model_config = ConfigDict(extra="forbid")
server_ip_address: Optional[str] = None
server_password: Optional[str] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, config: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
ConfigureDatabaseClientAction._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "DatabaseClient", "configure", config]
class ConfigureRansomwareScriptAction(AbstractAction):
"""Action which sets config parameters for a ransomware script on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this option."""
model_config = ConfigDict(extra="forbid")
server_ip_address: Optional[str] = None
server_password: Optional[str] = None
payload: Optional[str] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, config: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
ConfigureRansomwareScriptAction._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "RansomwareScript", "configure", config]
class ConfigureDoSBotAction(AbstractAction):
"""Action which sets config parameters for a DoS bot on a node."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this option."""
model_config = ConfigDict(extra="forbid")
target_ip_address: Optional[str] = None
target_port: Optional[str] = None
payload: Optional[str] = None
repeat: Optional[bool] = None
port_scan_p_of_success: Optional[float] = None
dos_intensity: Optional[float] = None
max_sessions: Optional[int] = None
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, node_id: int, config: Dict) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
self._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "DoSBot", "configure", config]
class NodeApplicationRemoveAction(AbstractAction):
"""Action which removes/uninstalls an application."""
@@ -1045,6 +1117,9 @@ class ActionManager:
"NODE_NMAP_PING_SCAN": NodeNMAPPingScanAction,
"NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction,
"NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction,
"CONFIGURE_DATABASE_CLIENT": ConfigureDatabaseClientAction,
"CONFIGURE_RANSOMWARE_SCRIPT": ConfigureRansomwareScriptAction,
"CONFIGURE_DOSBOT": ConfigureDoSBotAction,
}
"""Dictionary which maps action type strings to the corresponding action class."""

View File

@@ -55,7 +55,6 @@ class TAP001(AbstractScriptedAgent):
return "NODE_APPLICATION_INSTALL", {
"node_id": self.starting_node_idx,
"application_name": "RansomwareScript",
"ip_address": self.ip_address,
}
return "NODE_APPLICATION_EXECUTE", {"node_id": self.starting_node_idx, "application_id": 0}

View File

@@ -26,11 +26,14 @@ from primaite.simulator.network.hardware.nodes.network.wireless_router import Wi
from primaite.simulator.network.nmne import set_nmne_config
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.sim_container import Simulation
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import DataManipulationBot
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient # noqa: F401
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import ( # noqa: F401
DataManipulationBot,
)
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot # noqa: F401
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript # noqa: F401
from primaite.simulator.system.applications.web_browser import WebBrowser # noqa: F401
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
@@ -42,15 +45,6 @@ from primaite.simulator.system.services.web_server.web_server import WebServer
_LOGGER = getLogger(__name__)
APPLICATION_TYPES_MAPPING = {
"WebBrowser": WebBrowser,
"DatabaseClient": DatabaseClient,
"DataManipulationBot": DataManipulationBot,
"DoSBot": DoSBot,
"RansomwareScript": RansomwareScript,
}
"""List of available applications that can be installed on nodes in the PrimAITE Simulation."""
SERVICE_TYPES_MAPPING = {
"DNSClient": DNSClient,
"DNSServer": DNSServer,
@@ -323,7 +317,8 @@ class PrimaiteGame:
if "options" in service_cfg:
opt = service_cfg["options"]
new_service.password = opt.get("db_password", None)
new_service.configure_backup(backup_server=IPv4Address(opt.get("backup_server_ip")))
if "backup_server_ip" in opt:
new_service.configure_backup(backup_server=IPv4Address(opt.get("backup_server_ip")))
if service_type == "FTPServer":
if "options" in service_cfg:
opt = service_cfg["options"]
@@ -337,9 +332,9 @@ class PrimaiteGame:
new_application = None
application_type = application_cfg["type"]
if application_type in APPLICATION_TYPES_MAPPING:
new_node.software_manager.install(APPLICATION_TYPES_MAPPING[application_type])
new_application = new_node.software_manager.software[application_type]
if application_type in Application._application_registry:
new_node.software_manager.install(Application._application_registry[application_type])
new_application = new_node.software_manager.software[application_type] # grab the instance
# fixing duration for the application
if "fix_duration" in application_cfg.get("options", {}):

View File

@@ -507,8 +507,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now service 1 on node 2 has `health_status = 3`, indicating that the webapp is compromised.\n",
"File 1 in folder 1 on node 3 has `health_status = 2`, indicating that the database file is compromised."
"Now service 1 on HOST1 has `health_status = 3`, indicating that the webapp is compromised.\n",
"File 1 in folder 1 on HOST2 has `health_status = 2`, indicating that the database file is compromised."
]
},
{
@@ -545,9 +545,9 @@
"source": [
"The fixing takes two steps, so the reward hasn't changed yet. Let's do nothing for another timestep, the reward should improve.\n",
"\n",
"The reward will increase slightly as soon as the file finishes restoring. Then, the reward will increase to 1 when both green agents make successful requests.\n",
"The reward will increase slightly as soon as the file finishes restoring. Then, the reward will increase to 0.9 when both green agents make successful requests.\n",
"\n",
"Run the following cell until the green action is `NODE_APPLICATION_EXECUTE` for application 0, then the reward should become 1. If you run it enough times, another red attack will happen and the reward will drop again."
"Run the following cell until the green action is `NODE_APPLICATION_EXECUTE` for application 0, then the reward should increase. If you run it enough times, another red attack will happen and the reward will drop again."
]
},
{
@@ -708,7 +708,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.10.12"
}
},
"nbformat": 4,

View File

@@ -171,7 +171,7 @@
"from primaite.simulator.file_system.file_system import FileSystem\n",
"\n",
"# no applications exist yet so we will create our own.\n",
"class MSPaint(Application):\n",
"class MSPaint(Application, identifier=\"MSPaint\"):\n",
" def describe_state(self):\n",
" return super().describe_state()"
]

View File

@@ -196,7 +196,7 @@ class SimComponent(BaseModel):
..code::python
class WebBrowser(Application):
class WebBrowser(Application, identifier="WebBrowser"):
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager() # all requests generic to any Application get initialised
rm.add_request(...) # initialise any requests specific to the web browser

View File

@@ -6,7 +6,7 @@ import secrets
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, Dict, Optional, Type, TypeVar, Union
from typing import Any, Dict, Optional, TypeVar, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field
@@ -884,6 +884,61 @@ class Node(SimComponent):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
def _install_application(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Allows agents to install applications to the node.
:param request: list containing the application name as the only element
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: Request response with a success code if the application was installed.
:rtype: RequestResponse
"""
application_name = request[0]
if self.software_manager.software.get(application_name):
self.sys_log.warning(f"Can't install {application_name}. It's already installed.")
return RequestResponse.from_bool(False)
application_class = Application._application_registry[application_name]
self.software_manager.install(application_class)
application_instance = self.software_manager.software.get(application_name)
self.applications[application_instance.uuid] = application_instance
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
self._application_request_manager.add_request(
application_name, RequestType(func=application_instance._request_manager)
)
application_instance.install()
if application_name in self.software_manager.software:
return RequestResponse.from_bool(True)
else:
return RequestResponse.from_bool(False)
def _uninstall_application(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Uninstall and completely remove application from this node.
This method is useful for allowing agents to take this action.
:param request: list containing the application name as the only element
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: Request response with a success code if the application was uninstalled.
:rtype: RequestResponse
"""
application_name = request[0]
if application_name not in self.software_manager.software:
self.sys_log.warning(f"Can't uninstall {application_name}. It's not installed.")
return RequestResponse.from_bool(False)
application_instance = self.software_manager.software.get(application_name)
self.software_manager.uninstall(application_instance.name)
if application_instance.name not in self.software_manager.software:
return RequestResponse.from_bool(True)
else:
return RequestResponse.from_bool(False)
_node_is_on = Node._NodeIsOnValidator(node=self)
rm = super()._init_request_manager()
@@ -940,25 +995,8 @@ class Node(SimComponent):
name="application", request_type=RequestType(func=self._application_manager)
)
self._application_manager.add_request(
name="install",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.application_install_action(
application=self._read_application_type(request[0]), ip_address=request[1]
)
)
),
)
self._application_manager.add_request(
name="uninstall",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.application_uninstall_action(application=self._read_application_type(request[0]))
)
),
)
self._application_manager.add_request(name="install", request_type=RequestType(func=_install_application))
self._application_manager.add_request(name="uninstall", request_type=RequestType(func=_uninstall_application))
return rm
@@ -966,29 +1004,6 @@ class Node(SimComponent):
"""Install System Software - software that is usually provided with the OS."""
pass
def _read_application_type(self, application_class_str: str) -> Type[IOSoftwareClass]:
"""Wrapper that converts the string from the request manager into the appropriate class for the application."""
if application_class_str == "DoSBot":
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
return DoSBot
elif application_class_str == "DataManipulationBot":
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
DataManipulationBot,
)
return DataManipulationBot
elif application_class_str == "WebBrowser":
from primaite.simulator.system.applications.web_browser import WebBrowser
return WebBrowser
elif application_class_str == "RansomwareScript":
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
return RansomwareScript
else:
return 0
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
@@ -1417,76 +1432,6 @@ class Node(SimComponent):
self.sys_log.info(f"Uninstalled application {application.name}")
self._application_request_manager.remove_request(application.name)
def application_install_action(self, application: Application, ip_address: Optional[str] = None) -> bool:
"""
Install an application on this node and configure it.
This method is useful for allowing agents to take this action.
:param application: Application object that has not been installed on any node yet.
:type application: Application
:param ip_address: IP address used to configure the application
(target IP for the DoSBot or server IP for the DataManipulationBot)
:type ip_address: str
:return: True if the application is installed successfully, otherwise False.
"""
if application in self:
_LOGGER.warning(
f"Can't add application {application.__name__}" + f"to node {self.hostname}. It's already installed."
)
return True
self.software_manager.install(application)
application_instance = self.software_manager.software.get(str(application.__name__))
self.applications[application_instance.uuid] = application_instance
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
self._application_request_manager.add_request(
application_instance.name, RequestType(func=application_instance._request_manager)
)
# Configure application if additional parameters are given
if ip_address:
if application_instance.name == "DoSBot":
application_instance.configure(target_ip_address=IPv4Address(ip_address))
elif application_instance.name == "DataManipulationBot":
application_instance.configure(server_ip_address=IPv4Address(ip_address))
elif application_instance.name == "RansomwareScript":
application_instance.configure(server_ip_address=IPv4Address(ip_address))
else:
pass
application_instance.install()
if application_instance.name in self.software_manager.software:
return True
else:
return False
def application_uninstall_action(self, application: Application) -> bool:
"""
Uninstall and completely remove application from this node.
This method is useful for allowing agents to take this action.
:param application: Application object that is currently associated with this node.
:type application: Application
:return: True if the application is uninstalled successfully, otherwise False.
"""
if application.__name__ not in self.software_manager.software:
_LOGGER.warning(
f"Can't remove application {application.__name__}" + f"from node {self.hostname}. It's not installed."
)
return True
application_instance = self.software_manager.software.get(
str(application.__name__)
) # This works because we can't have two applications with the same name on the same node
# self.uninstall_application(application_instance)
self.software_manager.uninstall(application_instance.name)
if application_instance.name not in self.software_manager.software:
return True
else:
return False
def _shut_down_actions(self):
"""Actions to perform when the node is shut down."""
# Turn off all the services in the node

View File

@@ -1,7 +1,7 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, Set
from typing import Any, ClassVar, Dict, Optional, Set, Type
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
@@ -39,6 +39,22 @@ class Application(IOSoftware):
install_countdown: Optional[int] = None
"The countdown to the end of the installation process. None if not currently installing"
_application_registry: ClassVar[Dict[str, Type["Application"]]] = {}
"""Registry of application types. Automatically populated when subclasses are defined."""
def __init_subclass__(cls, identifier: str, **kwargs: Any) -> None:
"""
Register an application type.
:param identifier: Uniquely specifies an application class by name. Used for finding items by config.
:type identifier: str
:raises ValueError: When attempting to register an application with a name that is already allocated.
"""
super().__init_subclass__(**kwargs)
if identifier in cls._application_registry:
raise ValueError(f"Tried to define new application {identifier}, but this name is already reserved.")
cls._application_registry[identifier] = cls
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@@ -8,13 +8,14 @@ from uuid import uuid4
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.utils.validators import IPV4Address
class DatabaseClientConnection(BaseModel):
@@ -54,7 +55,7 @@ class DatabaseClientConnection(BaseModel):
self.client._disconnect(self.connection_id) # noqa
class DatabaseClient(Application):
class DatabaseClient(Application, identifier="DatabaseClient"):
"""
A DatabaseClient application.
@@ -96,6 +97,14 @@ class DatabaseClient(Application):
"""
rm = super()._init_request_manager()
rm.add_request("execute", RequestType(func=lambda request, context: RequestResponse.from_bool(self.execute())))
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
ip, pw = request[-1].get("server_ip_address"), request[-1].get("server_password")
ip = None if ip is None else IPV4Address(ip)
success = self.configure(server_ip_address=ip, server_password=pw)
return RequestResponse.from_bool(success)
rm.add_request("configure", RequestType(func=_configure))
return rm
def execute(self) -> bool:
@@ -141,16 +150,17 @@ class DatabaseClient(Application):
table.add_row([connection_id, connection.is_active])
print(table.get_string(sortby="Connection ID"))
def configure(self, server_ip_address: IPv4Address, server_password: Optional[str] = None):
def configure(self, server_ip_address: Optional[IPv4Address] = None, server_password: Optional[str] = None) -> bool:
"""
Configure the DatabaseClient to communicate with a DatabaseService.
:param server_ip_address: The IP address of the Node the DatabaseService is on.
:param server_password: The password on the DatabaseService.
"""
self.server_ip_address = server_ip_address
self.server_password = server_password
self.server_ip_address = server_ip_address or self.server_ip_address
self.server_password = server_password or self.server_password
self.sys_log.info(f"{self.name}: Configured the {self.name} with {server_ip_address=}, {server_password=}.")
return True
def connect(self) -> bool:
"""Connect the native client connection."""

View File

@@ -44,7 +44,7 @@ class PortScanPayload(SimComponent):
return state
class NMAP(Application):
class NMAP(Application, identifier="NMAP"):
"""
A class representing the NMAP application for network scanning.

View File

@@ -37,7 +37,7 @@ class DataManipulationAttackStage(IntEnum):
"Signifies that the attack has failed."
class DataManipulationBot(Application):
class DataManipulationBot(Application, identifier="DataManipulationBot"):
"""A bot that simulates a script which performs a SQL injection attack."""
payload: Optional[str] = None

View File

@@ -1,11 +1,11 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from enum import IntEnum
from ipaddress import IPv4Address
from typing import Optional
from typing import Dict, Optional
from primaite import getLogger
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.database_client import DatabaseClient
@@ -29,7 +29,7 @@ class DoSAttackStage(IntEnum):
"Attack is completed."
class DoSBot(DatabaseClient):
class DoSBot(DatabaseClient, identifier="DoSBot"):
"""A bot that simulates a Denial of Service attack."""
target_ip_address: Optional[IPv4Address] = None
@@ -71,6 +71,24 @@ class DoSBot(DatabaseClient):
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.run())),
)
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Configure the DoSBot.
:param request: List with one element that is a dict of options to pass to the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: Request Response object with a success code determining if the configuration was successful.
:rtype: RequestResponse
"""
if "target_ip_address" in request[-1]:
request[-1]["target_ip_address"] = IPv4Address(request[-1]["target_ip_address"])
if "target_port" in request[-1]:
request[-1]["target_port"] = Port[request[-1]["target_port"]]
return RequestResponse.from_bool(self.configure(**request[-1]))
rm.add_request("configure", request_type=RequestType(func=_configure))
return rm
def configure(
@@ -82,7 +100,7 @@ class DoSBot(DatabaseClient):
port_scan_p_of_success: float = 0.1,
dos_intensity: float = 1.0,
max_sessions: int = 1000,
):
) -> bool:
"""
Configure the Denial of Service bot.
@@ -90,10 +108,12 @@ class DoSBot(DatabaseClient):
:param: target_port: The port of the target service. Optional - Default is `Port.HTTP`
:param: payload: The payload the DoS Bot will throw at the target service. Optional - Default is `None`
:param: repeat: If True, the bot will maintain the attack. Optional - Default is `True`
:param: port_scan_p_of_success: The chance of the port scan being sucessful. Optional - Default is 0.1 (10%)
:param: port_scan_p_of_success: The chance of the port scan being successful. Optional - Default is 0.1 (10%)
:param: dos_intensity: The intensity of the DoS attack.
Multiplied with the application's max session - Default is 1.0
:param: max_sessions: The maximum number of sessions the DoS bot will attack with. Optional - Default is 1000
:return: Always returns True
:rtype: bool
"""
self.target_ip_address = target_ip_address
self.target_port = target_port
@@ -106,6 +126,7 @@ class DoSBot(DatabaseClient):
f"{self.name}: Configured the {self.name} with {target_ip_address=}, {target_port=}, {payload=}, "
f"{repeat=}, {port_scan_p_of_success=}, {dos_intensity=}, {max_sessions=}."
)
return True
def run(self) -> bool:
"""Run the Denial of Service Bot."""
@@ -117,6 +138,9 @@ class DoSBot(DatabaseClient):
The main application loop for the Denial of Service bot.
The loop goes through the stages of a DoS attack.
:return: True if the application loop could be executed, False otherwise.
:rtype: bool
"""
if not self._can_perform_action():
return False
@@ -126,7 +150,7 @@ class DoSBot(DatabaseClient):
self.sys_log.warning(
f"{self.name} is not properly configured. {self.target_ip_address=}, {self.target_port=}"
)
return True
return False
self.clear_connections()
self._perform_port_scan(p_of_success=self.port_scan_p_of_success)

View File

@@ -2,7 +2,7 @@
from ipaddress import IPv4Address
from typing import Dict, Optional
from primaite.interface.request import RequestResponse
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@@ -10,7 +10,7 @@ from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
class RansomwareScript(Application):
class RansomwareScript(Application, identifier="RansomwareScript"):
"""Ransomware Kill Chain - Designed to be used by the TAP001 Agent on the example layout Network.
:ivar payload: The attack stage query payload. (Default ENCRYPT)
@@ -62,6 +62,25 @@ class RansomwareScript(Application):
name="execute",
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self.attack())),
)
def _configure(request: RequestFormat, context: Dict) -> RequestResponse:
"""
Request for configuring the target database and payload.
:param request: Request with one element contianing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: RequestResponse object with a success code reflecting whether the configuration could be applied.
:rtype: RequestResponse
"""
ip = request[-1].get("server_ip_address")
ip = None if ip is None else IPv4Address(ip)
pw = request[-1].get("server_password")
payload = request[-1].get("payload")
return RequestResponse.from_bool(self.configure(ip, pw, payload))
rm.add_request("configure", request_type=RequestType(func=_configure))
return rm
def run(self) -> bool:
@@ -91,7 +110,7 @@ class RansomwareScript(Application):
server_ip_address: Optional[IPv4Address] = None,
server_password: Optional[str] = None,
payload: Optional[str] = None,
):
) -> bool:
"""
Configure the Ransomware Script to communicate with a DatabaseService.
@@ -108,6 +127,7 @@ class RansomwareScript(Application):
self.sys_log.info(
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}."
)
return True
def attack(self) -> bool:
"""Perform the attack steps after opening the application."""

View File

@@ -23,7 +23,7 @@ from primaite.simulator.system.services.dns.dns_client import DNSClient
_LOGGER = getLogger(__name__)
class WebBrowser(Application):
class WebBrowser(Application, identifier="WebBrowser"):
"""
Represents a web browser in the simulation environment.