From 7dd25f18f666a319528d34df5916893f3d7c37f3 Mon Sep 17 00:00:00 2001 From: Nick Todd Date: Tue, 10 Dec 2024 12:27:50 +0000 Subject: [PATCH] #2888: Update with ConfigSchema --- .../simulator/system/applications/application.py | 1 + .../simulator/system/applications/database_client.py | 9 ++++++++- src/primaite/simulator/system/applications/nmap.py | 7 +++++++ .../applications/red_applications/c2/abstract_c2.py | 11 +++++++++-- .../applications/red_applications/c2/c2_beacon.py | 10 +++++++++- .../applications/red_applications/c2/c2_server.py | 8 ++++++++ .../system/applications/red_applications/dos_bot.py | 8 ++++++++ .../red_applications/ransomware_script.py | 7 +++++++ .../simulator/system/applications/web_browser.py | 7 +++++++ 9 files changed, 64 insertions(+), 4 deletions(-) diff --git a/src/primaite/simulator/system/applications/application.py b/src/primaite/simulator/system/applications/application.py index 43ffa37a..402c64f2 100644 --- a/src/primaite/simulator/system/applications/application.py +++ b/src/primaite/simulator/system/applications/application.py @@ -29,6 +29,7 @@ class Application(IOSoftware): Applications are user-facing programs that may perform input/output operations. """ + config: "Application.ConfigSchema" operating_state: ApplicationOperatingState = ApplicationOperatingState.CLOSED diff --git a/src/primaite/simulator/system/applications/database_client.py b/src/primaite/simulator/system/applications/database_client.py index cd4b2a03..cc593a30 100644 --- a/src/primaite/simulator/system/applications/database_client.py +++ b/src/primaite/simulator/system/applications/database_client.py @@ -68,10 +68,12 @@ class DatabaseClient(Application, identifier="DatabaseClient"): Extends the Application class to provide functionality for connecting, querying, and disconnecting from a Database Service. It mainly operates over TCP protocol. - :ivar server_ip_address: The IPv4 address of the Database Service server, defaults to None. """ + config: "DatabaseClient.ConfigSchema" + server_ip_address: Optional[IPv4Address] = None + """The IPv4 address of the Database Service server, defaults to None.""" server_password: Optional[str] = None _query_success_tracker: Dict[str, bool] = {} """Keep track of connections that were established or verified during this step. Used for rewards.""" @@ -88,6 +90,11 @@ class DatabaseClient(Application, identifier="DatabaseClient"): native_connection: Optional[DatabaseClientConnection] = None """Native Client Connection for using the client directly (similar to psql in a terminal).""" + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for DatabaseClient.""" + + type: str = "DATABASE_CLIENT" + def __init__(self, **kwargs): kwargs["name"] = "DatabaseClient" kwargs["port"] = PORT_LOOKUP["POSTGRES_SERVER"] diff --git a/src/primaite/simulator/system/applications/nmap.py b/src/primaite/simulator/system/applications/nmap.py index e2b9117d..3f9724ca 100644 --- a/src/primaite/simulator/system/applications/nmap.py +++ b/src/primaite/simulator/system/applications/nmap.py @@ -52,6 +52,8 @@ class NMAP(Application, identifier="NMAP"): as ping scans to discover active hosts and port scans to detect open ports on those hosts. """ + config: "NMAP.ConfigSchema" + _active_port_scans: Dict[str, PortScanPayload] = {} _port_scan_responses: Dict[str, PortScanPayload] = {} @@ -62,6 +64,11 @@ class NMAP(Application, identifier="NMAP"): (False, False): "Port", } + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for NMAP.""" + + type: str = "NMAP" + def __init__(self, **kwargs): kwargs["name"] = "NMAP" kwargs["port"] = PORT_LOOKUP["NONE"] diff --git a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py index f77bc33a..9961e790 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py @@ -45,7 +45,7 @@ class C2Payload(Enum): """C2 Input Command payload. Used by the C2 Server to send a command to the c2 beacon.""" OUTPUT = "output_command" - """C2 Output Command. Used by the C2 Beacon to send the results of a Input command to the c2 server.""" + """C2 Output Command. Used by the C2 Beacon to send the results of an Input command to the c2 server.""" class AbstractC2(Application, identifier="AbstractC2"): @@ -63,6 +63,8 @@ class AbstractC2(Application, identifier="AbstractC2"): Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite. """ + config: "AbstractC2" + c2_connection_active: bool = False """Indicates if the c2 server and c2 beacon are currently connected.""" @@ -75,6 +77,11 @@ class AbstractC2(Application, identifier="AbstractC2"): keep_alive_inactivity: int = 0 """Indicates how many timesteps since the last time the c2 application received a keep alive.""" + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for AbstractC2.""" + + type: str = "ABSTRACTC2" + class _C2Opts(BaseModel): """A Pydantic Schema for the different C2 configuration options.""" @@ -118,7 +125,7 @@ class AbstractC2(Application, identifier="AbstractC2"): :type c2_command: C2Command. :param command_options: The relevant C2 Beacon parameters.F :type command_options: Dict - :return: Returns the construct C2Packet + :return: Returns the constructed C2Packet :rtype: C2Packet """ constructed_packet = C2Packet( diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index c0c3d872..98cb85ba 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -8,6 +8,7 @@ from pydantic import validate_call from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.protocols.masquerade import C2Packet +from primaite.simulator.system.applications.application import Application from primaite.simulator.system.applications.red_applications.c2 import ExfilOpts, RansomwareOpts, TerminalOpts from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript @@ -32,15 +33,22 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): 2. Leveraging the terminal application to execute requests (dependent on the command given) 3. Sending the RequestResponse back to the C2 Server (Command output) - Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite. + Please refer to the Command-and-Control notebook for an in-depth example of the C2 Suite. """ + config: "C2Beacon.ConfigSchema" + keep_alive_attempted: bool = False """Indicates if a keep alive has been attempted to be sent this timestep. Used to prevent packet storms.""" terminal_session: TerminalClientConnection = None "The currently in use terminal session." + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for C2Beacon.""" + + type: str = "C2BEACON" + @property def _host_terminal(self) -> Optional[Terminal]: """Return the Terminal that is installed on the same machine as the C2 Beacon.""" diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index f948d696..b5ea9e08 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -7,6 +7,7 @@ from pydantic import validate_call from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.protocols.masquerade import C2Packet +from primaite.simulator.system.applications.application import Application from primaite.simulator.system.applications.red_applications.c2 import ( CommandOpts, ExfilOpts, @@ -34,9 +35,16 @@ class C2Server(AbstractC2, identifier="C2Server"): Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite. """ + config: "C2Server.ConfigSchema" + current_command_output: RequestResponse = None """The Request Response by the last command send. This attribute is updated by the method _handle_command_output.""" + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for C2Server.""" + + type: str = "C2SERVER" + def _init_request_manager(self) -> RequestManager: """ Initialise the request manager. diff --git a/src/primaite/simulator/system/applications/red_applications/dos_bot.py b/src/primaite/simulator/system/applications/red_applications/dos_bot.py index fb2c8847..a02b04c5 100644 --- a/src/primaite/simulator/system/applications/red_applications/dos_bot.py +++ b/src/primaite/simulator/system/applications/red_applications/dos_bot.py @@ -7,6 +7,7 @@ from primaite import getLogger from primaite.game.science import simulate_trial from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType +from primaite.simulator.system.applications.application import Application from primaite.simulator.system.applications.database_client import DatabaseClient from primaite.utils.validation.port import Port, PORT_LOOKUP @@ -32,6 +33,8 @@ class DoSAttackStage(IntEnum): class DoSBot(DatabaseClient, identifier="DoSBot"): """A bot that simulates a Denial of Service attack.""" + config: "DoSBot.ConfigSchema" + target_ip_address: Optional[IPv4Address] = None """IP address of the target service.""" @@ -53,6 +56,11 @@ class DoSBot(DatabaseClient, identifier="DoSBot"): dos_intensity: float = 1.0 """How much of the max sessions will be used by the DoS when attacking.""" + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for DoSBot.""" + + type: str = "DOSBOT" + def __init__(self, **kwargs): super().__init__(**kwargs) self.name = "DoSBot" diff --git a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py index 93b4c50d..236cde79 100644 --- a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py +++ b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py @@ -18,6 +18,8 @@ class RansomwareScript(Application, identifier="RansomwareScript"): :ivar payload: The attack stage query payload. (Default ENCRYPT) """ + config: "RansomwareScript.ConfigSchema" + server_ip_address: Optional[IPv4Address] = None """IP address of node which hosts the database.""" server_password: Optional[str] = None @@ -25,6 +27,11 @@ class RansomwareScript(Application, identifier="RansomwareScript"): payload: Optional[str] = "ENCRYPT" "Payload String for the payload stage" + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for RansomwareScript.""" + + type: str = "RANSOMWARE_SCRIPT" + def __init__(self, **kwargs): kwargs["name"] = "RansomwareScript" kwargs["port"] = PORT_LOOKUP["NONE"] diff --git a/src/primaite/simulator/system/applications/web_browser.py b/src/primaite/simulator/system/applications/web_browser.py index c57a9bd3..35f35fea 100644 --- a/src/primaite/simulator/system/applications/web_browser.py +++ b/src/primaite/simulator/system/applications/web_browser.py @@ -30,6 +30,8 @@ class WebBrowser(Application, identifier="WebBrowser"): The application requests and loads web pages using its domain name and requesting IP addresses using DNS. """ + config: "WebBrowser.ConfigSchema" + target_url: Optional[str] = None domain_name_ip_address: Optional[IPv4Address] = None @@ -41,6 +43,11 @@ class WebBrowser(Application, identifier="WebBrowser"): history: List["BrowserHistoryItem"] = [] """Keep a log of visited websites and information about the visit, such as response code.""" + class ConfigSchema(Application.ConfigSchema): + """ConfigSchema for WebBrowser.""" + + type: str = "WEB_BROWSER" + def __init__(self, **kwargs): kwargs["name"] = "WebBrowser" kwargs["protocol"] = PROTOCOL_LOOKUP["TCP"]