diff --git a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb index 052136f8..6fa91c04 100644 --- a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb +++ b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb @@ -1557,7 +1557,7 @@ "source": [ "The code cell below goes through 10 timesteps and displays the differences between the default and the current timestep.\n", "\n", - "You will notice that the only observation space differences after 10 timesteps. This is due to the C2 Suite confirming their connection through sending ``Keep Alive`` traffic across the network." + "You will notice that the only two timesteps displayed observation space differences. This is due to the C2 Suite confirming their connection through sending ``Keep Alive`` traffic across the network every 5 timesteps." ] }, { @@ -1575,7 +1575,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Next, the code cells below configuring the C2 Beacon's Keep Alive Frequency to confirm connection on every timestep." + "Next, the code cells below configure the C2 Beacon to confirm connection on every timestep via changing the ``keep_alive_frequency`` to ``1``." ] }, { @@ -1593,7 +1593,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The code cells below demonstrate that the observation impacts of the Keep Alive can be seen on every timestep. " + "Demonstrating that the observation impacts of the Keep Alive can be seen on every timestep:" ] }, { @@ -1608,6 +1608,29 @@ " display_obs_diffs(default_obs, keep_alive_obs, blue_config_env.game.step_counter)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Additionally, the keep_alive_frequency can also be used to configure the C2 Beacon to confirm connection less frequently. \n", + "\n", + "The code cells below demonstrate the impacts of changing the frequency rate to ``7`` timesteps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "c2_beacon.configure(c2_server_ip_address=\"192.168.10.21\", keep_alive_frequency=7)\n", + "\n", + "# Comparing the OBS of the default frequency to a timestep frequency of 7\n", + "for i in range(7):\n", + " keep_alive_obs, _, _, _, _ = blue_config_env.step(0)\n", + " display_obs_diffs(default_obs, keep_alive_obs, blue_config_env.game.step_counter)" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -1618,8 +1641,17 @@ "\n", "In the real world, Adversaries take defensive steps to reduce the chance that an installed C2 Beacon is discovered. One of the most commonly used methods is to masquerade c2 traffic as other commonly used networking protocols.\n", "\n", - "In primAITE, red agents can begin to simulate stealth behaviour by configuring C2 traffic to use different protocols mid episode or between episodes. \n", - "Currently, red agent actions are limited to using ports: ``DNS``, ``FTP`` and ``HTTP`` and protocols: ``UDP`` and ``TCP``.\n", + "In primAITE, red agents can begin to simulate stealth behaviour by configuring C2 traffic to use different protocols mid episode or between episodes.\n", + "\n", + "Currently, red agent actions support the following port and protocol options:\n", + "\n", + "| Supported Ports | Supported Protocols |\n", + "|------------------|---------------------|\n", + "|``DNS`` | ``UDP`` |\n", + "|``FTP`` | ``TCP`` |\n", + "|``HTTP`` | |\n", + "\n", + "\n", "\n", "The next set of code cells will demonstrate the impact this option from a blue agent perspective." ] diff --git a/src/primaite/simulator/system/applications/red_applications/c2/__init__.py b/src/primaite/simulator/system/applications/red_applications/c2/__init__.py index 97923284..919b1bf5 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/__init__.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/__init__.py @@ -14,36 +14,36 @@ class Ransomware_Opts(Command_Opts): """A Pydantic Schema for the Ransomware Configuration command options.""" server_ip_address: str - """""" + """The IP Address of the target database that the RansomwareScript will attack.""" payload: Optional[str] = Field(default="ENCRYPT") - """""" + """The malicious payload to be used to attack the target database.""" class Remote_Opts(Command_Opts): - """A base C2 Pydantic Schema for all C2 Commands that require a remote terminal connection.""" + """A base C2 Pydantic Schema for all C2 Commands that require a terminal connection.""" ip_address: Optional[str] = Field(default=None) - """""" + """The IP address of a remote host. If this field defaults to None then a local session is used.""" username: str - """""" + """A Username of a valid user account. Used to login into both remote and local hosts.""" password: str - """""" + """A Password of a valid user account. Used to login into both remote and local hosts.""" class Exfil_Opts(Remote_Opts): """A Pydantic Schema for the C2 Data Exfiltration command options.""" target_ip_address: str - """""" - - target_folder_name: str - """""" + """The IP address of the target host that will be the target of the exfiltration.""" target_file_name: str - """""" + """The name of the file that is attempting to be exfiltrated.""" + + target_folder_name: str + """The name of the remote folder which contains the target file.""" exfiltration_folder_name: Optional[str] = Field(default="exfiltration_folder") """""" @@ -53,4 +53,4 @@ class Terminal_Opts(Remote_Opts): """A Pydantic Schema for the C2 Terminal command options.""" commands: Union[list[RequestFormat], RequestFormat] - """""" + """A list or individual Terminal Command. Please refer to the RequestResponse system for further info.""" diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index 47e4f902..71703500 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -14,11 +14,7 @@ from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.red_applications.c2 import Exfil_Opts, Ransomware_Opts, Terminal_Opts from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript -from primaite.simulator.system.services.terminal.terminal import ( - LocalTerminalConnection, - RemoteTerminalConnection, - Terminal, -) +from primaite.simulator.system.services.terminal.terminal import Terminal, TerminalClientConnection class C2Beacon(AbstractC2, identifier="C2Beacon"): @@ -43,11 +39,8 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): keep_alive_attempted: bool = False """Indicates if a keep alive has been attempted to be sent this timestep. Used to prevent packet storms.""" - local_terminal_session: LocalTerminalConnection = None - "The currently in use local terminal session." - - remote_terminal_session: RemoteTerminalConnection = None - "The currently in use remote terminal session" + terminal_session: TerminalClientConnection = None + "The currently in use terminal session." @property def _host_terminal(self) -> Optional[Terminal]: @@ -65,25 +58,20 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): self.sys_log.warning(f"{self.__class__.__name__} cannot find installed ransomware on its host.") return ransomware_script - def get_terminal_session(self, username: str, password: str) -> Optional[LocalTerminalConnection]: - """Return an instance of a Local Terminal Connection upon successful login. Otherwise returns None.""" - if self.local_terminal_session is None: - host_terminal: Terminal = self._host_terminal - self.local_terminal_session = host_terminal.login(username=username, password=password) + def _set_terminal_session(self, username: str, password: str, ip_address: Optional[IPv4Address] = None) -> bool: + """ + Attempts to create and a terminal session using the parameters given. - return self.local_terminal_session + If an IP Address is passed then this method will attempt to create a remote terminal + session. Otherwise a local terminal session will be created. - def get_remote_terminal_session( - self, username: str, password: str, ip_address: IPv4Address - ) -> Optional[RemoteTerminalConnection]: - """Return an instance of a Local Terminal Connection upon successful login. Otherwise returns None.""" - if self.remote_terminal_session is None: - host_terminal: Terminal = self._host_terminal - self.remote_terminal_session = host_terminal.login( - username=username, password=password, ip_address=ip_address - ) - - return self.remote_terminal_session + :return: Returns true if a terminal session was successfully set. False otherwise. + :rtype: Bool + """ + self.terminal_session is None + host_terminal: Terminal = self._host_terminal + self.terminal_session = host_terminal.login(username=username, password=password, ip_address=ip_address) + return self.terminal_session is not None def _init_request_manager(self) -> RequestManager: """ @@ -354,7 +342,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :payload C2Packet: The incoming INPUT command. :type Masquerade Packet: C2Packet. - :return: Returns the Request Response returned by the Terminal execute method. + :return: Returns a tuple containing Request Response returned by the Terminal execute method. :rtype: Request Response """ if self._host_ftp_server is None: @@ -367,12 +355,16 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): command_opts = Exfil_Opts.model_validate(payload.payload) # Setting up the terminal session and the ftp server - terminal_session = self.get_remote_terminal_session( + if not self._set_terminal_session( username=command_opts.username, password=command_opts.password, ip_address=command_opts.target_ip_address - ) + ): + return RequestResponse( + status="failure", + data={"Reason": "Cannot create a terminal session. Are the credentials correct?"}, + ) # Using the terminal to start the FTP Client on the remote machine. - terminal_session.execute(command=["service", "start", "FTPClient"]) + self.terminal_session.execute(command=["service", "start", "FTPClient"]) # Need to supply to the FTP Client the C2 Beacon's host IP. host_network_interfaces = self.software_manager.node.network_interfaces @@ -390,7 +382,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): # Lambda method used to return a failure RequestResponse if we're unable to perform the exfiltration. # If _check_connection returns false then connection_status will return reason (A 'failure' Request Response) if attempt_exfiltration := (lambda return_bool, reason: reason if return_bool is False else None)( - *self._perform_target_exfiltration(exfil_opts, terminal_session) + *self._perform_exfiltration(exfil_opts) ): return attempt_exfiltration @@ -406,13 +398,29 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): ) ) - def _perform_target_exfiltration( - self, exfil_opts: dict, terminal_session: RemoteTerminalConnection - ) -> tuple[bool, RequestResponse]: - """Confirms that the target data is currently present within the C2 Beacon's hosts file system.""" + def _perform_exfiltration(self, exfil_opts: Exfil_Opts) -> tuple[bool, RequestResponse]: + """ + Attempts to exfiltrate a target file from a target using the parameters given. + + Uses the current terminal_session to send a command to the + remote host's FTP Client passing the exfil_opts as command options. + + This will instruct the FTP client to send the target file to the + dest_ip_address's destination folder. + + This method assumes that the following: + 1. The self.terminal_session is the remote target. + 2. The target has a functioning FTP Client Service. + + + :exfil_opts: A Pydantic model containing the require configuration options + :type exfil_opts: Exfil_Opts + :return: Returns a tuple containing a success boolean and a Request Response.. + :rtype: tuple[bool, RequestResponse + """ # Using the terminal to send the target data back to the C2 Beacon. exfil_response: RequestResponse = RequestResponse.from_bool( - terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts]) + self.terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts]) ) # Validating that we successfully received the target data. @@ -432,16 +440,20 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): f"{self.name}: Unable to locate exfiltrated file on local filesystem." f"Perhaps the file transfer failed?" ) - return RequestResponse( - status="failure", data={"reason": "Unable to locate exfiltrated data on file system."} - ) + return [ + False, + RequestResponse(status="failure", data={"reason": "Unable to locate exfiltrated data on file system."}), + ] if self._host_ftp_client is None: self.sys_log.warning(f"{self.name}: C2 Beacon unable to the FTP Server. Unable to resolve command.") - return RequestResponse( - status="failure", - data={"Reason": "Cannot find any instances of both a FTP Server & Client. Are they installed?"}, - ) + return [ + False, + RequestResponse( + status="failure", + data={"Reason": "Cannot find any instances of both a FTP Server & Client. Are they installed?"}, + ), + ] return [ True, @@ -474,16 +486,12 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): terminal_output: Dict[int, RequestResponse] = {} # Creating a remote terminal session if given an IP Address, otherwise using a local terminal session. - if command_opts.ip_address is None: - terminal_session = self.get_terminal_session(username=command_opts.username, password=command_opts.password) - else: - terminal_session = self.get_remote_terminal_session( - username=command_opts.username, password=command_opts.password, ip_address=command_opts.ip_address - ) - - if terminal_session is None: + if not self._set_terminal_session( + username=command_opts.username, password=command_opts.password, ip_address=command_opts.ip_address + ): return RequestResponse( - status="failure", data={"reason": "Terminal Login failed. Cannot create a terminal session."} + status="failure", + data={"Reason": "Cannot create a terminal session. Are the credentials correct?"}, ) # Converts a singular terminal command: [RequestFormat] into a list with one element [[RequestFormat]] @@ -495,10 +503,10 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): # A try catch exception ladder was used but was considered not the best approach # as it can end up obscuring visibility of actual bugs (Not the expected ones) and was a temporary solution. # TODO: Refactor + add further validation to ensure that a request is correct. (maybe a pydantic method?) - terminal_output[index] = terminal_session.execute(given_command) + terminal_output[index] = self.terminal_session.execute(given_command) # Reset our remote terminal session. - self.remote_terminal_session is None + self.terminal_session is None return RequestResponse(status="success", data=terminal_output) def _handle_keep_alive(self, payload: C2Packet, session_id: Optional[str]) -> bool: