diff --git a/docs/source/simulation_components/system/applications/c2_suite.rst b/docs/source/simulation_components/system/applications/c2_suite.rst index c360d0be..e299bb0e 100644 --- a/docs/source/simulation_components/system/applications/c2_suite.rst +++ b/docs/source/simulation_components/system/applications/c2_suite.rst @@ -14,7 +14,7 @@ Overview: ========= These two new classes intend to Red Agents a cyber realistic way of leveraging the capabilities of the ``Terminal`` application. -Whilst introducing both more oppourtinies for the blue agent to notice and subvert Red Agents during an episode. +Whilst introducing both more opportunities for the blue agent to notice and subvert Red Agents during an episode. For a more in-depth look at the command and control applications then please refer to the ``C2-E2E-Notebook``. @@ -42,7 +42,7 @@ It's important to note that in order to keep the PrimAITE realistic from a cyber The C2 Server application should never be visible or actionable upon directly by the blue agent. This is because in the real world, C2 servers are hosted on ephemeral public domains that would not be accessible by private network blue agent. -Therefore granting a blue agent's the ability to perform counter measures directly against the application would be unrealistic. +Therefore granting blue agent(s) the ability to perform counter measures directly against the application would be unrealistic. It is more accurate to see the host that the C2 Server is installed on as being able to route to the C2 Server (Internet Access). diff --git a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb index bbe29cd6..b41b9f2e 100644 --- a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb +++ b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb @@ -631,8 +631,8 @@ "source": [ "# Installing RansomwareScript via C2 Terminal Commands\n", "ransomware_install_command = {\"commands\":[[\"software_manager\", \"application\", \"install\", \"RansomwareScript\"]],\n", - " \"username\": \"pass123\",\n", - " \"password\": \"password123\"}\n", + " \"username\": \"admin\",\n", + " \"password\": \"admin\"}\n", "c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)\n" ] }, @@ -830,8 +830,8 @@ "source": [ "# Attempting to install the C2 RansomwareScript\n", "ransomware_install_command = {\"commands\":[[\"software_manager\", \"application\", \"install\", \"RansomwareScript\"]],\n", - " \"username\": \"pass123\",\n", - " \"password\": \"password123\"}\n", + " \"username\": \"admin\",\n", + " \"password\": \"admin\"}\n", "\n", "c2_server: C2Server = client_1.software_manager.software[\"C2Server\"]\n", "c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)" @@ -918,8 +918,8 @@ "source": [ "# Attempting to install the C2 RansomwareScript\n", "ransomware_install_command = {\"commands\":[[\"software_manager\", \"application\", \"install\", \"RansomwareScript\"]],\n", - " \"username\": \"pass123\",\n", - " \"password\": \"password123\"}\n", + " \"username\": \"admin\",\n", + " \"password\": \"admin\"}\n", "\n", "c2_server: C2Server = client_1.software_manager.software[\"C2Server\"]\n", "c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)" diff --git a/src/primaite/simulator/network/protocols/masquerade.py b/src/primaite/simulator/network/protocols/masquerade.py index 7ef17fc0..e2a7b6a0 100644 --- a/src/primaite/simulator/network/protocols/masquerade.py +++ b/src/primaite/simulator/network/protocols/masquerade.py @@ -12,6 +12,12 @@ class MasqueradePacket(DataPacket): masquerade_port: Enum # The 'Masquerade' port that is currently in use + +class C2Packet(MasqueradePacket): + """Represents C2 suite communications packets.""" + payload_type: Enum # The type of C2 traffic (e.g keep alive, command or command out) command: Optional[Enum] = None # Used to pass the actual C2 Command in C2 INPUT + + keep_alive_frequency: int diff --git a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py index c7b0d32c..a00b8570 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py @@ -4,13 +4,14 @@ from enum import Enum from ipaddress import IPv4Address from typing import Dict, Optional -from pydantic import validate_call +from pydantic import BaseModel, Field, validate_call -from primaite.simulator.network.protocols.masquerade import MasqueradePacket +from primaite.simulator.network.protocols.masquerade import C2Packet from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.applications.application import Application +from primaite.simulator.system.applications.application import Application, ApplicationOperatingState from primaite.simulator.system.core.session_manager import Session +from primaite.simulator.system.software import SoftwareHealthState # TODO: # Create test that leverage all the functionality needed for the different TAPs @@ -65,19 +66,30 @@ class AbstractC2(Application, identifier="AbstractC2"): c2_remote_connection: IPv4Address = None """The IPv4 Address of the remote c2 connection. (Either the IP of the beacon or the server).""" - # These two attributes are set differently in the c2 server and c2 beacon. - # The c2 server parses the keep alive and sets these accordingly. - # The c2 beacon will set this attributes upon installation and configuration - - current_masquerade_protocol: IPProtocol = IPProtocol.TCP - """The currently chosen protocol that the C2 traffic is masquerading as. Defaults as TCP.""" - - current_masquerade_port: Port = Port.HTTP - """The currently chosen port that the C2 traffic is masquerading as. Defaults at HTTP.""" - - current_c2_session: Session = None + c2_session: Session = None """The currently active session that the C2 Traffic is using. Set after establishing connection.""" + keep_alive_inactivity: int = 0 + """Indicates how many timesteps since the last time the c2 application received a keep alive.""" + + class _C2_Opts(BaseModel): + """A Pydantic Schema for the different C2 configuration options.""" + + keep_alive_frequency: int = Field(default=5, ge=1) + """The frequency at which ``Keep Alive`` packets are sent to the C2 Server from the C2 Beacon.""" + + masquerade_protocol: IPProtocol = Field(default=IPProtocol.TCP) + """The currently chosen protocol that the C2 traffic is masquerading as. Defaults as TCP.""" + + masquerade_port: Port = Field(default=Port.HTTP) + """The currently chosen port that the C2 traffic is masquerading as. Defaults at HTTP.""" + + # The c2 beacon sets the c2_config through it's own internal method - configure (which is also used by agents) + # and then passes the config attributes to the c2 server via keep alives + # The c2 server parses the C2 configurations from keep alive traffic and sets the c2_config accordingly. + c2_config: _C2_Opts = _C2_Opts() + """Holds the current configuration settings of the C2 Suite.""" + def describe_state(self) -> Dict: """ Describe the state of the C2 application. @@ -96,7 +108,7 @@ class AbstractC2(Application, identifier="AbstractC2"): # Validate call ensures we are only handling Masquerade Packets. @validate_call - def _handle_c2_payload(self, payload: MasqueradePacket, session_id: Optional[str] = None) -> bool: + def _handle_c2_payload(self, payload: C2Packet, session_id: Optional[str] = None) -> bool: """Handles masquerade payloads for both c2 beacons and c2 servers. Currently, the C2 application suite can handle the following payloads: @@ -151,18 +163,18 @@ class AbstractC2(Application, identifier="AbstractC2"): """Abstract Method: Used in C2 beacon to parse and handle commands received from the c2 server.""" pass - def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool: + def _handle_keep_alive(self, payload: C2Packet, session_id: Optional[str]) -> bool: """Abstract Method: The C2 Server and the C2 Beacon handle the KEEP ALIVEs differently.""" # from_network_interface=from_network_interface - def receive(self, payload: MasqueradePacket, session_id: Optional[str] = None, **kwargs) -> bool: + def receive(self, payload: C2Packet, session_id: Optional[str] = None, **kwargs) -> bool: """Receives masquerade packets. Used by both c2 server and c2 beacon. Defining the `Receive` method so that the application can receive packets via the session manager. These packets are then immediately handed to ._handle_c2_payload. :param payload: The Masquerade Packet to be received. - :type payload: MasqueradePacket + :type payload: C2Packet :param session_id: The transport session_id that the payload is originating from. :type session_id: str """ @@ -193,9 +205,10 @@ class AbstractC2(Application, identifier="AbstractC2"): # We also Pass masquerade proto`col/port so that the c2 server can reply on the correct protocol/port. # (This also lays the foundations for switching masquerade port/protocols mid episode.) - keep_alive_packet = MasqueradePacket( - masquerade_protocol=self.current_masquerade_protocol, - masquerade_port=self.current_masquerade_port, + keep_alive_packet = C2Packet( + masquerade_protocol=self.c2_config.masquerade_protocol, + masquerade_port=self.c2_config.masquerade_port, + keep_alive_frequency=self.c2_config.keep_alive_frequency, payload_type=C2Payload.KEEP_ALIVE, command=None, ) @@ -203,13 +216,15 @@ class AbstractC2(Application, identifier="AbstractC2"): if self.send( payload=keep_alive_packet, dest_ip_address=self.c2_remote_connection, - dest_port=self.current_masquerade_port, - ip_protocol=self.current_masquerade_protocol, + dest_port=self.c2_config.masquerade_port, + ip_protocol=self.c2_config.masquerade_protocol, session_id=session_id, ): self.keep_alive_sent = True self.sys_log.info(f"{self.name}: Keep Alive sent to {self.c2_remote_connection}") - self.sys_log.debug(f"{self.name}: on {self.current_masquerade_port} via {self.current_masquerade_protocol}") + self.sys_log.debug( + f"{self.name}: on {self.c2_config.masquerade_port} via {self.c2_config.masquerade_protocol}" + ) return True else: self.sys_log.warning( @@ -217,7 +232,7 @@ class AbstractC2(Application, identifier="AbstractC2"): ) return False - def _resolve_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool: + def _resolve_keep_alive(self, payload: C2Packet, session_id: Optional[str]) -> bool: """ Parses the Masquerade Port/Protocol within the received Keep Alive packet. @@ -227,7 +242,7 @@ class AbstractC2(Application, identifier="AbstractC2"): Returns False otherwise. :param payload: The Keep Alive payload received. - :type payload: MasqueradePacket + :type payload: C2Packet :param session_id: The transport session_id that the payload is originating from. :type session_id: str :return: True on successful configuration, false otherwise. @@ -241,16 +256,61 @@ class AbstractC2(Application, identifier="AbstractC2"): ) return False - # Setting the masquerade_port/protocol attribute: - self.current_masquerade_port = payload.masquerade_port - self.current_masquerade_protocol = payload.masquerade_protocol + # Updating the C2 Configuration attribute. + + self.c2_config.masquerade_port = payload.masquerade_port + self.c2_config.masquerade_protocol = payload.masquerade_protocol + self.c2_config.keep_alive_frequency = payload.keep_alive_frequency # This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon) if self.c2_remote_connection is None: self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.") - self.c2_remote_connection = IPv4Address(self.current_c2_session.with_ip_address) + self.c2_remote_connection = IPv4Address(self.c2_session.with_ip_address) self.c2_connection_active = True # Sets the connection to active self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero return True + + def _reset_c2_connection(self) -> None: + """Resets all currently established C2 communications to their default setting.""" + self.c2_connection_active = False + self.c2_session = None + self.keep_alive_inactivity = 0 + self.keep_alive_frequency = 5 + self.c2_remote_connection = None + self.c2_config.masquerade_port = Port.HTTP + self.c2_config.masquerade_protocol = IPProtocol.TCP + + @abstractmethod + def _confirm_connection(self, timestep: int) -> bool: + """Abstract method - Checks the suitability of the current C2 Server/Beacon connection.""" + + def apply_timestep(self, timestep: int) -> None: + """Apply a timestep to the c2_server & c2 beacon. + + Used to keep track of when the c2 server should consider a beacon dead + and set it's c2_remote_connection attribute to false. + + 1. Each timestep the keep_alive_inactivity is increased. + + 2. If the keep alive inactivity eclipses that of the keep alive frequency then another keep alive is sent. + + 3. If a keep alive response packet is received then the ``keep_alive_inactivity`` attribute is reset. + + Therefore, if ``keep_alive_inactivity`` attribute is not 0 after a keep alive is sent + then the connection is considered severed and c2 beacon will shut down. + + :param timestep: The current timestep of the simulation. + :type timestep: Int + :return bool: Returns false if connection was lost. Returns True if connection is active or re-established. + :rtype bool: + """ + super().apply_timestep(timestep=timestep) + if ( + self.operating_state is ApplicationOperatingState.RUNNING + and self.health_state_actual is SoftwareHealthState.GOOD + ): + self.keep_alive_inactivity += 1 + self._confirm_connection(timestep) + return diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index d8911622..55dd1474 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -8,10 +8,9 @@ from pydantic import validate_call from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType -from primaite.simulator.network.protocols.masquerade import MasqueradePacket +from primaite.simulator.network.protocols.masquerade import C2Packet from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.services.terminal.terminal import ( @@ -19,7 +18,6 @@ from primaite.simulator.system.services.terminal.terminal import ( RemoteTerminalConnection, Terminal, ) -from primaite.simulator.system.software import SoftwareHealthState class C2Beacon(AbstractC2, identifier="C2Beacon"): @@ -41,13 +39,6 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): keep_alive_attempted: bool = False """Indicates if a keep alive has been attempted to be sent this timestep. Used to prevent packet storms.""" - # We should set the application to NOT_RUNNING if the inactivity count reaches a certain thresh hold. - keep_alive_inactivity: int = 0 - """Indicates how many timesteps since the last time the c2 application received a keep alive.""" - - keep_alive_frequency: int = 5 - "The frequency at which ``Keep Alive`` packets are sent to the C2 Server from the C2 Beacon." - local_terminal_session: LocalTerminalConnection = None "The currently in use local terminal session." @@ -164,9 +155,9 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :type masquerade_port: Enum (Port) """ self.c2_remote_connection = IPv4Address(c2_server_ip_address) - self.keep_alive_frequency = keep_alive_frequency - self.current_masquerade_port = masquerade_port - self.current_masquerade_protocol = masquerade_protocol + self.c2_config.keep_alive_frequency = keep_alive_frequency + self.c2_config.masquerade_port = masquerade_port + self.c2_config.masquerade_protocol = masquerade_protocol self.sys_log.info( f"{self.name}: Configured {self.name} with remote C2 server connection: {c2_server_ip_address=}." ) @@ -188,7 +179,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): self.num_executions += 1 return self._send_keep_alive(session_id=None) - def _handle_command_input(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool: + def _handle_command_input(self, payload: C2Packet, session_id: Optional[str]) -> bool: """ Handles the parsing of C2 Commands from C2 Traffic (Masquerade Packets). @@ -205,7 +196,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): the implementation of these commands. :param payload: The INPUT C2 Payload - :type payload: MasqueradePacket + :type payload: C2Packet :return: The Request Response provided by the terminal execute method. :rtype Request Response: """ @@ -250,21 +241,24 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :param session_id: The current session established with the C2 Server. :type session_id: Str """ - output_packet = MasqueradePacket( - masquerade_protocol=self.current_masquerade_protocol, - masquerade_port=self.current_masquerade_port, + output_packet = C2Packet( + masquerade_protocol=self.c2_config.masquerade_protocol, + masquerade_port=self.c2_config.masquerade_port, + keep_alive_frequency=self.c2_config.keep_alive_frequency, payload_type=C2Payload.OUTPUT, payload=command_output, ) if self.send( payload=output_packet, dest_ip_address=self.c2_remote_connection, - dest_port=self.current_masquerade_port, - ip_protocol=self.current_masquerade_protocol, + dest_port=self.c2_config.masquerade_port, + ip_protocol=self.c2_config.masquerade_protocol, session_id=session_id, ): self.sys_log.info(f"{self.name}: Command output sent to {self.c2_remote_connection}") - self.sys_log.debug(f"{self.name}: on {self.current_masquerade_port} via {self.current_masquerade_protocol}") + self.sys_log.debug( + f"{self.name}: on {self.c2_config.masquerade_port} via {self.c2_config.masquerade_protocol}" + ) return True else: self.sys_log.warning( @@ -272,7 +266,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): ) return False - def _command_ransomware_config(self, payload: MasqueradePacket) -> RequestResponse: + def _command_ransomware_config(self, payload: C2Packet) -> RequestResponse: """ C2 Command: Ransomware Configuration. @@ -282,8 +276,8 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): The class attribute self._host_ransomware_script will return None if the host does not have an instance of the RansomwareScript. - :payload MasqueradePacket: The incoming INPUT command. - :type Masquerade Packet: MasqueradePacket. + :payload C2Packet: The incoming INPUT command. + :type Masquerade Packet: C2Packet. :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ @@ -299,7 +293,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): ) ) - def _command_ransomware_launch(self, payload: MasqueradePacket) -> RequestResponse: + def _command_ransomware_launch(self, payload: C2Packet) -> RequestResponse: """ C2 Command: Ransomware Launch. @@ -307,8 +301,8 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): This request is then sent to the terminal service in order to be executed. - :payload MasqueradePacket: The incoming INPUT command. - :type Masquerade Packet: MasqueradePacket. + :payload C2Packet: The incoming INPUT command. + :type Masquerade Packet: C2Packet. :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ @@ -319,15 +313,15 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): ) return RequestResponse.from_bool(self._host_ransomware_script.attack()) - def _command_terminal(self, payload: MasqueradePacket) -> RequestResponse: + def _command_terminal(self, payload: C2Packet) -> RequestResponse: """ C2 Command: Terminal. Creates a request that executes a terminal command. This request is then sent to the terminal service in order to be executed. - :payload MasqueradePacket: The incoming INPUT command. - :type Masquerade Packet: MasqueradePacket. + :payload C2Packet: The incoming INPUT command. + :type Masquerade Packet: C2Packet. :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ @@ -368,7 +362,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): self.remote_terminal_session is None return RequestResponse(status="success", data=terminal_output) - def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool: + def _handle_keep_alive(self, payload: C2Packet, session_id: Optional[str]) -> bool: """ Handles receiving and sending keep alive payloads. This method is only called if we receive a keep alive. @@ -395,7 +389,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): if self.keep_alive_attempted is True: self.c2_connection_active = True # Sets the connection to active self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero - self.current_c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id] + self.c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id] # We set keep alive_attempted here to show that we've achieved connection. self.keep_alive_attempted = False @@ -412,42 +406,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): # If this method returns true then we have sent successfully sent a keep alive. return self._send_keep_alive(session_id) - def apply_timestep(self, timestep: int) -> None: - """Apply a timestep to the c2_beacon. - - Used to keep track of when the c2 beacon should send another keep alive. - The following logic is applied: - - 1. Each timestep the keep_alive_inactivity is increased. - - 2. If the keep alive inactivity eclipses that of the keep alive frequency then another keep alive is sent. - - 3. If a keep alive response packet is received then the ``keep_alive_inactivity`` attribute is reset. - - Therefore, if ``keep_alive_inactivity`` attribute is not 0 after a keep alive is sent - then the connection is considered severed and c2 beacon will shut down. - - :param timestep: The current timestep of the simulation. - :type timestep: Int - :return bool: Returns false if connection was lost. Returns True if connection is active or re-established. - :rtype bool: - """ - super().apply_timestep(timestep=timestep) - self.keep_alive_attempted = False # Resetting keep alive sent. - if ( - self.operating_state is ApplicationOperatingState.RUNNING - and self.health_state_actual is SoftwareHealthState.GOOD - ): - self.keep_alive_inactivity += 1 - if not self._check_c2_connection(timestep): - self.sys_log.error(f"{self.name}: Connection Severed - Application Closing.") - self.c2_connection_active = False - self.clear_connections() - # TODO: Shouldn't this close() method also set the health state to 'UNUSED'? - self.close() - return - - def _check_c2_connection(self, timestep: int) -> bool: + def _confirm_connection(self, timestep: int) -> bool: """Checks the suitability of the current C2 Server connection. If a connection cannot be confirmed then this method will return false otherwise true. @@ -457,20 +416,23 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :return: Returns False if connection was lost. Returns True if connection is active or re-established. :rtype bool: """ - if self.keep_alive_inactivity == self.keep_alive_frequency: + self.keep_alive_attempted = False # Resetting keep alive sent. + if self.keep_alive_inactivity == self.c2_config.keep_alive_frequency: self.sys_log.info( f"{self.name}: Attempting to Send Keep Alive to {self.c2_remote_connection} at timestep {timestep}." ) - self._send_keep_alive(session_id=self.current_c2_session.uuid) + self._send_keep_alive(session_id=self.c2_session.uuid) if self.keep_alive_inactivity != 0: self.sys_log.warning( f"{self.name}: Did not receive keep alive from c2 Server. Connection considered severed." ) + self._reset_c2_connection() + self.close() return False return True # Defining this abstract method from Abstract C2 - def _handle_command_output(self, payload: MasqueradePacket): + def _handle_command_output(self, payload: C2Packet): """C2 Beacons currently does not need to handle output commands coming from the C2 Servers.""" self.sys_log.warning(f"{self.name}: C2 Beacon received an unexpected OUTPUT payload: {payload}.") pass @@ -520,9 +482,9 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): self.c2_connection_active, self.c2_remote_connection, self.keep_alive_inactivity, - self.keep_alive_frequency, - self.current_masquerade_protocol, - self.current_masquerade_port, + self.c2_config.keep_alive_frequency, + self.c2_config.masquerade_protocol, + self.c2_config.masquerade_port, ] ) print(table) diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index 211da210..e4bf3302 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -6,7 +6,7 @@ from pydantic import validate_call from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType -from primaite.simulator.network.protocols.masquerade import MasqueradePacket +from primaite.simulator.network.protocols.masquerade import C2Packet from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload @@ -96,18 +96,18 @@ class C2Server(AbstractC2, identifier="C2Server"): kwargs["name"] = "C2Server" super().__init__(**kwargs) - def _handle_command_output(self, payload: MasqueradePacket) -> bool: + def _handle_command_output(self, payload: C2Packet) -> bool: """ Handles the parsing of C2 Command Output from C2 Traffic (Masquerade Packets). - Parses the Request Response within MasqueradePacket's payload attribute (Inherited from Data packet). + Parses the Request Response within C2Packet's payload attribute (Inherited from Data packet). The class attribute self.current_command_output is then set to this Request Response. If the payload attribute does not contain a RequestResponse, then an error will be raised in syslog and the self.current_command_output is updated to reflect the error. :param payload: The OUTPUT C2 Payload - :type payload: MasqueradePacket + :type payload: C2Packet :return: Returns True if the self.current_command_output is currently updated, false otherwise. :rtype Bool: """ @@ -123,7 +123,7 @@ class C2Server(AbstractC2, identifier="C2Server"): self.current_command_output = command_output return True - def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool: + def _handle_keep_alive(self, payload: C2Packet, session_id: Optional[str]) -> bool: """ Handles receiving and sending keep alive payloads. This method is only called if we receive a keep alive. @@ -137,7 +137,7 @@ class C2Server(AbstractC2, identifier="C2Server"): Returns True if a keep alive was successfully sent or already has been sent this timestep. :param payload: The Keep Alive payload received. - :type payload: MasqueradePacket + :type payload: C2Packet :param session_id: The transport session_id that the payload is originating from. :type session_id: str :return: True if successfully handled, false otherwise. @@ -146,7 +146,7 @@ class C2Server(AbstractC2, identifier="C2Server"): self.sys_log.info(f"{self.name}: Keep Alive Received. Attempting to resolve the remote connection details.") self.c2_connection_active = True # Sets the connection to active - self.current_c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id] + self.c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id] if self._resolve_keep_alive(payload, session_id) == False: self.sys_log.warning(f"{self.name}: Keep Alive Could not be resolved correctly. Refusing Keep Alive.") @@ -205,7 +205,7 @@ class C2Server(AbstractC2, identifier="C2Server"): status="failure", data={"Reason": "C2 Beacon has yet to establish connection. Unable to send command."} ) - if self.current_c2_session is None: + if self.c2_session is None: self.sys_log.warning(f"{self.name}: C2 Beacon cannot be reached. Rejecting command.") return RequestResponse( status="failure", data={"Reason": "C2 Beacon cannot be reached. Unable to send command."} @@ -217,18 +217,22 @@ class C2Server(AbstractC2, identifier="C2Server"): if self.send( payload=command_packet, dest_ip_address=self.c2_remote_connection, - session_id=self.current_c2_session.uuid, - dest_port=self.current_masquerade_port, - ip_protocol=self.current_masquerade_protocol, + session_id=self.c2_session.uuid, + dest_port=self.c2_config.masquerade_port, + ip_protocol=self.c2_config.masquerade_protocol, ): self.sys_log.info(f"{self.name}: Successfully sent {given_command}.") self.sys_log.info(f"{self.name}: Awaiting command response {given_command}.") # If the command output was handled currently, the self.current_command_output will contain the RequestResponse. + if self.current_command_output is None: + return RequestResponse( + status="failure", data={"Reason": "Command sent to the C2 Beacon but no response was ever received."} + ) return self.current_command_output - # TODO: Probably could move this as a class method in MasqueradePacket. - def _craft_packet(self, given_command: C2Command, command_options: Dict) -> MasqueradePacket: + # TODO: Probably could move this as a class method in C2Packet. + def _craft_packet(self, given_command: C2Command, command_options: Dict) -> C2Packet: """ Creates and returns a Masquerade Packet using the arguments given. @@ -238,12 +242,13 @@ class C2Server(AbstractC2, identifier="C2Server"): :type given_command: C2Command. :param command_options: The relevant C2 Beacon parameters.F :type command_options: Dict - :return: Returns the construct MasqueradePacket - :rtype: MasqueradePacket + :return: Returns the construct C2Packet + :rtype: C2Packet """ - constructed_packet = MasqueradePacket( - masquerade_protocol=self.current_masquerade_protocol, - masquerade_port=self.current_masquerade_port, + constructed_packet = C2Packet( + masquerade_protocol=self.c2_config.masquerade_protocol, + masquerade_port=self.c2_config.masquerade_port, + keep_alive_frequency=self.c2_config.keep_alive_frequency, payload_type=C2Payload.INPUT, command=given_command, payload=command_options, @@ -281,20 +286,41 @@ class C2Server(AbstractC2, identifier="C2Server"): [ self.c2_connection_active, self.c2_remote_connection, - self.current_masquerade_protocol, - self.current_masquerade_port, + self.c2_config.masquerade_protocol, + self.c2_config.masquerade_port, ] ) print(table) # Abstract method inherited from abstract C2 - Not currently utilised. - def _handle_command_input(self, payload: MasqueradePacket) -> None: + def _handle_command_input(self, payload: C2Packet) -> None: """Defining this method (Abstract method inherited from abstract C2) in order to instantiate the class. C2 Servers currently do not receive input commands coming from the C2 Beacons. - :param payload: The incoming MasqueradePacket - :type payload: MasqueradePacket. + :param payload: The incoming C2Packet + :type payload: C2Packet. """ self.sys_log.warning(f"{self.name}: C2 Server received an unexpected INPUT payload: {payload}") pass + + def _confirm_connection(self, timestep: int) -> bool: + """Checks the suitability of the current C2 Beacon connection. + + If a C2 Server has not received a keep alive within the current set + keep alive frequency (self._keep_alive_frequency) then the C2 beacons + connection is considered dead and any commands will be rejected. + + :param timestep: The current timestep of the simulation. + :type timestep: Int + :return: Returns False if the C2 beacon is considered dead. Otherwise True. + :rtype bool: + """ + if self.keep_alive_inactivity > self.c2_config.keep_alive_frequency: + self.sys_log.debug( + f"{self.name}: Failed to receive expected keep alive from {self.c2_remote_connection} at {timestep}." + ) + self.sys_log.info(f"{self.name}: C2 Beacon connection considered dead due to inactivity.") + self._reset_c2_connection() + return False + return True diff --git a/tests/integration_tests/system/red_applications/test_c2_suite.py b/tests/integration_tests/system/red_applications/test_c2_suite_integration.py similarity index 71% rename from tests/integration_tests/system/red_applications/test_c2_suite.py rename to tests/integration_tests/system/red_applications/test_c2_suite_integration.py index 9b799ff5..ab609cb0 100644 --- a/tests/integration_tests/system/red_applications/test_c2_suite.py +++ b/tests/integration_tests/system/red_applications/test_c2_suite_integration.py @@ -17,7 +17,7 @@ from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.database_client import DatabaseClient from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon -from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server +from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.services.database.database_service import DatabaseService from primaite.simulator.system.services.dns.dns_server import DNSServer @@ -29,6 +29,7 @@ def basic_network() -> Network: network = Network() # Creating two generic nodes for the C2 Server and the C2 Beacon. + node_a = Computer( hostname="node_a", ip_address="192.168.0.2", @@ -43,12 +44,24 @@ def basic_network() -> Network: node_b = Computer( hostname="node_b", ip_address="192.168.255.2", - subnet_mask="255.255.255.252", + subnet_mask="255.255.255.248", default_gateway="192.168.255.1", start_up_duration=0, ) + node_b.power_on() node_b.software_manager.install(software_class=C2Beacon) + + # Creating a generic computer for testing remote terminal connections. + node_c = Computer( + hostname="node_c", + ip_address="192.168.255.3", + subnet_mask="255.255.255.248", + default_gateway="192.168.255.1", + start_up_duration=0, + ) + node_c.power_on() + # Creating a router to sit between node 1 and node 2. router = Router(hostname="router", num_ports=3, start_up_duration=0) # Default allow all. @@ -66,35 +79,43 @@ def basic_network() -> Network: switch_2.power_on() network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6]) - router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252") + router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.248") router.enable_port(1) router.enable_port(2) # Connecting the node to each switch network.connect(node_a.network_interface[1], switch_1.network_interface[1]) - network.connect(node_b.network_interface[1], switch_2.network_interface[1]) + network.connect(node_c.network_interface[1], switch_2.network_interface[2]) return network +def setup_c2(given_network: Network): + """Installs the C2 Beacon & Server, configures and then returns.""" + computer_a: Computer = given_network.get_node_by_hostname("node_a") + c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + computer_a.software_manager.install(DatabaseService) + computer_a.software_manager.software["DatabaseService"].start() + + computer_b: Computer = given_network.get_node_by_hostname("node_b") + c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + computer_b.software_manager.install(DatabaseClient) + computer_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.2")) + computer_b.software_manager.software["DatabaseClient"].run() + + c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) + c2_server.run() + c2_beacon.establish() + + return given_network, computer_a, c2_server, computer_b, c2_beacon + + def test_c2_suite_setup_receive(basic_network): """Test that C2 Beacon can successfully establish connection with the C2 Server.""" network: Network = basic_network - computer_a: Computer = network.get_node_by_hostname("node_a") - c2_server: C2Server = computer_a.software_manager.software.get("C2Server") - - computer_b: Computer = network.get_node_by_hostname("node_b") - c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") - - # Assert that the c2 beacon configure correctly. - c2_beacon.configure(c2_server_ip_address="192.168.0.2") - assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.2") - - c2_server.run() - c2_beacon.establish() - + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) # Asserting that the c2 beacon has established a c2 connection assert c2_beacon.c2_connection_active is True @@ -112,15 +133,7 @@ def test_c2_suite_setup_receive(basic_network): def test_c2_suite_keep_alive_inactivity(basic_network): """Tests that C2 Beacon disconnects from the C2 Server after inactivity.""" network: Network = basic_network - computer_a: Computer = network.get_node_by_hostname("node_a") - c2_server: C2Server = computer_a.software_manager.software.get("C2Server") - - computer_b: Computer = network.get_node_by_hostname("node_b") - c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") - - c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) - c2_server.run() - c2_beacon.establish() + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) c2_beacon.apply_timestep(0) assert c2_beacon.keep_alive_inactivity == 1 @@ -133,21 +146,21 @@ def test_c2_suite_keep_alive_inactivity(basic_network): # Now we turn off the c2 server (Thus preventing a keep alive) c2_server.close() c2_beacon.apply_timestep(2) + + assert c2_beacon.keep_alive_inactivity == 1 + c2_beacon.apply_timestep(3) - assert c2_beacon.keep_alive_inactivity == 2 + + # C2 Beacon resets it's connections back to default. + assert c2_beacon.keep_alive_inactivity == 0 assert c2_beacon.c2_connection_active == False assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED def test_c2_suite_configure_request(basic_network): """Tests that the request system can be used to successfully setup a c2 suite.""" - # Setting up the network: network: Network = basic_network - computer_a: Computer = network.get_node_by_hostname("node_a") - c2_server: C2Server = computer_a.software_manager.software.get("C2Server") - - computer_b: Computer = network.get_node_by_hostname("node_b") - c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) # Testing Via Requests: c2_server.run() @@ -173,20 +186,7 @@ def test_c2_suite_ransomware_commands(basic_network): """Tests the Ransomware commands can be used to configure & launch ransomware via Requests.""" # Setting up the network: network: Network = basic_network - computer_a: Computer = network.get_node_by_hostname("node_a") - c2_server: C2Server = computer_a.software_manager.software.get("C2Server") - computer_a.software_manager.install(DatabaseService) - computer_a.software_manager.software["DatabaseService"].start() - - computer_b: Computer = network.get_node_by_hostname("node_b") - c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") - computer_b.software_manager.install(DatabaseClient) - computer_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.2")) - computer_b.software_manager.software["DatabaseClient"].run() - - c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) - c2_server.run() - c2_beacon.establish() + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) # Testing Via Requests: computer_b.software_manager.install(software_class=RansomwareScript) @@ -208,18 +208,10 @@ def test_c2_suite_acl_block(basic_network): """Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules.""" network: Network = basic_network - computer_a: Computer = network.get_node_by_hostname("node_a") - c2_server: C2Server = computer_a.software_manager.software.get("C2Server") - - computer_b: Computer = network.get_node_by_hostname("node_b") - c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) router: Router = network.get_node_by_hostname("router") - c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) - c2_server.run() - c2_beacon.establish() - c2_beacon.apply_timestep(0) assert c2_beacon.keep_alive_inactivity == 1 @@ -233,10 +225,53 @@ def test_c2_suite_acl_block(basic_network): c2_beacon.apply_timestep(2) c2_beacon.apply_timestep(3) - assert c2_beacon.keep_alive_inactivity == 2 + + # C2 Beacon resets after unable to maintain contact. + + assert c2_beacon.keep_alive_inactivity == 0 assert c2_beacon.c2_connection_active == False assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED -def test_c2_suite_terminal(basic_network): - """Tests the Ransomware commands can be used to configure & launch ransomware via Requests.""" +def test_c2_suite_terminal_command_file_creation(basic_network): + """Tests the C2 Terminal command can be used on local and remote.""" + network: Network = basic_network + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + computer_c: Computer = network.get_node_by_hostname("node_c") + + # Asserting to demonstrate that the test files don't exist: + assert ( + computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == False + ) + + assert ( + computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == False + ) + + # Testing that we can create the test file and folders via the terminal command (Local C2 Terminal). + + # Local file/folder creation commands. + file_create_command = { + "commands": [ + ["file_system", "create", "folder", "test_folder"], + ["file_system", "create", "file", "test_folder", "test_file", "True"], + ], + "username": "admin", + "password": "admin", + "ip_address": None, + } + + c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command) + + assert computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True + assert c2_beacon.local_terminal_session is not None + + # Testing that we can create the same test file/folders via on node 3 via a remote terminal. + + # node_c's IP is 192.168.255.3 + file_create_command.update({"ip_address": "192.168.255.3"}) + + c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command) + + assert computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True + assert c2_beacon.remote_terminal_session is not None diff --git a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py new file mode 100644 index 00000000..a790081f --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py @@ -0,0 +1,138 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +import pytest + +from primaite.simulator.network.container import Network +from primaite.simulator.network.hardware.nodes.host.computer import Computer +from primaite.simulator.network.hardware.nodes.host.server import Server +from primaite.simulator.system.applications.application import ApplicationOperatingState +from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon +from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server + + +@pytest.fixture(scope="function") +def basic_c2_network() -> Network: + network = Network() + + # Creating two generic nodes for the C2 Server and the C2 Beacon. + + computer_a = Computer( + hostname="computer_a", + ip_address="192.168.0.1", + subnet_mask="255.255.255.252", + start_up_duration=0, + ) + computer_a.power_on() + computer_a.software_manager.install(software_class=C2Server) + + computer_b = Computer( + hostname="computer_b", ip_address="192.168.0.2", subnet_mask="255.255.255.252", start_up_duration=0 + ) + + computer_b.power_on() + computer_b.software_manager.install(software_class=C2Beacon) + + network.connect(endpoint_a=computer_a.network_interface[1], endpoint_b=computer_b.network_interface[1]) + return network + + +def setup_c2(given_network: Network): + """Installs the C2 Beacon & Server, configures and then returns.""" + network: Network = given_network + + computer_a: Computer = network.get_node_by_hostname("computer_a") + computer_b: Computer = network.get_node_by_hostname("computer_b") + + c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + + c2_beacon.configure(c2_server_ip_address="192.168.0.1", keep_alive_frequency=2) + c2_server.run() + c2_beacon.establish() + + return network, computer_a, c2_server, computer_b, c2_beacon + + +def test_c2_handle_server_disconnect(basic_c2_network): + """Tests that the C2 suite will be able handle the c2 server application closing.""" + network: Network = basic_c2_network + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + assert c2_beacon.c2_connection_active is True + + ##### C2 Server disconnecting. + + # Closing the C2 Server + c2_server.close() + + # Applying 10 timesteps to trigger C2 beacon keep alive + + for i in range(10): + network.apply_timestep(i) + + assert c2_beacon.c2_connection_active is False + assert c2_beacon.operating_state is ApplicationOperatingState.CLOSED + + # C2 Beacon disconnected. + + network: Network = basic_c2_network + + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + +def test_c2_handle_beacon_disconnect(basic_c2_network): + """Tests that the C2 suite will be able handle the c2 beacon application closing.""" + network: Network = basic_c2_network + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + assert c2_server.c2_connection_active is True + + # Closing the C2 beacon + + c2_beacon.close() + + assert c2_beacon.operating_state is ApplicationOperatingState.CLOSED + + # Attempting a simple C2 Server command: + file_create_command = { + "commands": [["file_system", "create", "folder", "test_folder"]], + "username": "admin", + "password": "admin", + "ip_address": None, + } + + command_request_response = c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command) + + assert command_request_response.status == "failure" + + # Despite the command failing - The C2 Server will still consider the beacon alive + # Until it does not respond within the keep alive frequency set in the last keep_alive. + assert c2_server.c2_connection_active is True + + # Stepping 6 timesteps in order for the C2 server to consider the beacon dead. + for i in range(6): + network.apply_timestep(i) + + assert c2_server.c2_connection_active is False + + +# TODO: Finalise and complete these tests. + + +def test_c2_handle_switching_port(basic_c2_network): + """Tests that the C2 suite will be able handle switching destination/src port.""" + network: Network = basic_c2_network + + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + # Asserting that the c2 beacon has established a c2 connection + assert c2_beacon.c2_connection_active is True + + +def test_c2_handle_switching_frequency(basic_c2_network): + """Tests that the C2 suite will be able handle switching keep alive frequency.""" + network: Network = basic_c2_network + + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + # Asserting that the c2 beacon has established a c2 connection + assert c2_beacon.c2_connection_active is True