From 7d086ec35e2a5cd3f0a3a9f9b4e25005a598942a Mon Sep 17 00:00:00 2001 From: Archer Bowen Date: Thu, 15 Aug 2024 17:08:10 +0100 Subject: [PATCH] #2689 Implemented pydantic model validation on C2 Server setup method + updated E2E notebook with data exfiltration. --- .../Command-&-Control-E2E-Demonstration.ipynb | 110 +++++++++++++++--- .../red_applications/c2/__init__.py | 14 ++- .../red_applications/c2/abstract_c2.py | 4 +- .../red_applications/c2/c2_beacon.py | 15 ++- .../red_applications/c2/c2_server.py | 51 ++++++-- .../_red_applications/test_c2_suite.py | 27 +++++ 6 files changed, 182 insertions(+), 39 deletions(-) diff --git a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb index 0e8c8931..f8c550e0 100644 --- a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb +++ b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb @@ -500,9 +500,9 @@ "source": [ "### **Command and Control** | C2 Server Actions | C2_SERVER_DATA_EXFILTRATE\n", "\n", - "Finally, currently the last action available is the ``C2_SERVER_DATA_EXFILTRATE`` which can be used to exfiltrate a target_file on a remote node to the C2 Beacon & Server's host file system via the ``FTP`` services.\n", + "The second to last action available is the ``C2_SERVER_DATA_EXFILTRATE`` which can be used to exfiltrate a target file on a remote node to the C2 Beacon & Server's host file system via the ``FTP`` services.\n", "\n", - "This action is indexed as action ``9``. # TODO: Update.\n", + "This action is indexed as action ``6``..\n", "\n", "The below yaml snippet shows all the relevant agent options for this action\n", "\n", @@ -651,9 +651,13 @@ " applications:\n", " - application_name: C2Beacon\n", " - application_name: RansomwareScript\n", + " folders:\n", + " - folder_name: exfiltration_folder\n", + " files:\n", + " - file_name: database.db\n", " - hostname: database_server\n", " folders:\n", - " - folder_name: database\n", + " - folder_name: exfiltration_folder\n", " files:\n", " - file_name: database.db\n", " - hostname: client_1\n", @@ -663,7 +667,7 @@ " num_folders: 1\n", " num_files: 1\n", " num_nics: 1\n", - " include_num_access: false\n", + " include_num_access: true\n", " include_nmne: false\n", " monitored_traffic:\n", " icmp:\n", @@ -832,7 +836,14 @@ "source": [ "### **Command and Control** | Blue Agent Relevance | Observation Space\n", "\n", - "This section demonstrates the OBS impact if the C2 suite is successfully installed and then used to install, configure and launch the ransomwarescript." + "This section demonstrates the impacts that each of that the C2 Beacon and the C2 Server's commands cause on the observation space (OBS)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### **Command and Control** | OBS Impact | C2 Beacon | Installation & Configuration" ] }, { @@ -888,6 +899,19 @@ "display_obs_diffs(default_obs, c2_configuration_obs, blue_env.game.step_counter)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### **Command and Control** | OBS Impact | C2 Server | Terminal Command\n", + "\n", + "Using the C2 Server's ``TERMINAL`` command it is possible to install a ``RansomwareScript`` application onto the C2 Beacon's host.\n", + "\n", + "The below code cells perform this as well as capturing the OBS impacts.\n", + "\n", + "It's important to note that the ``TERMINAL`` command is not limited to just installing software." + ] + }, { "cell_type": "code", "execution_count": null, @@ -922,11 +946,22 @@ "c2_ransomware_obs, _, _, _, _ = blue_env.step(0)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "display_obs_diffs(default_obs, c2_ransomware_obs, env.game.step_counter)" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ - "The code cell below demonstrates the differences between the default observation space and the configuration of the C2 Server and the Ransomware installation." + "#### **Command and Control** | OBS Impact | C2 Server | Data Exfiltration\n", + "\n", + "Before encrypting the database.db file, the ``DATA_EXFILTRATION`` command can be used to copy the database.db file onto both the C2 Server and the C2 Beacon's file systems:" ] }, { @@ -935,7 +970,61 @@ "metadata": {}, "outputs": [], "source": [ - "display_obs_diffs(default_obs, c2_ransomware_obs, env.game.step_counter)" + "exfil_options={\n", + " \"username\": \"admin\",\n", + " \"password\": \"admin\",\n", + " \"target_ip_address\": \"192.168.1.14\",\n", + " \"target_folder_name\": \"database\",\n", + " \"exfiltration_folder_name\": \"exfiltration_folder\",\n", + " \"target_file_name\": \"database.db\",\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "c2_server.send_command(given_command=C2Command.DATA_EXFILTRATION, command_options=exfil_options)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "c2_exfil_obs, _, _, _, _ = blue_env.step(0)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "display_obs_diffs(c2_ransomware_obs, c2_exfil_obs, env.game.step_counter)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### **Command and Control** | OBS Impact | C2 Server | Ransomware Commands\n", + "\n", + "The code cell below demonstrates the differences between the ransomware script installation obs and the impact of RansomwareScript upon the database." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Configuring the RansomwareScript\n", + "ransomware_config = {\"server_ip_address\": \"192.168.1.14\", \"payload\": \"ENCRYPT\"}\n", + "c2_server.send_command(C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)" ] }, { @@ -959,13 +1048,6 @@ "c2_final_obs, _, _, _, _ = blue_env.step(0)" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The code cell below demonstrates the differences between the default observation space and the configuration of the C2 Server, the ransomware script installation as well as the impact of RansomwareScript upon the database." - ] - }, { "cell_type": "code", "execution_count": null, diff --git a/src/primaite/simulator/system/applications/red_applications/c2/__init__.py b/src/primaite/simulator/system/applications/red_applications/c2/__init__.py index 919b1bf5..23dfeb31 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/__init__.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/__init__.py @@ -1,7 +1,7 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK from typing import Optional, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator, ValidationInfo from primaite.interface.request import RequestFormat @@ -9,6 +9,14 @@ from primaite.interface.request import RequestFormat class Command_Opts(BaseModel): """A C2 Pydantic Schema acting as a base class for all C2 Commands.""" + @field_validator("payload", "exfiltration_folder_name", "ip_address", mode="before", check_fields=False) + @classmethod + def not_none(cls, v: str, info: ValidationInfo) -> int: + """If None is passed, use the default value instead.""" + if v is None: + return cls.model_fields[info.field_name].default + return v + class Ransomware_Opts(Command_Opts): """A Pydantic Schema for the Ransomware Configuration command options.""" @@ -16,7 +24,7 @@ class Ransomware_Opts(Command_Opts): server_ip_address: str """The IP Address of the target database that the RansomwareScript will attack.""" - payload: Optional[str] = Field(default="ENCRYPT") + payload: str = Field(default="ENCRYPT") """The malicious payload to be used to attack the target database.""" @@ -45,7 +53,7 @@ class Exfil_Opts(Remote_Opts): target_folder_name: str """The name of the remote folder which contains the target file.""" - exfiltration_folder_name: Optional[str] = Field(default="exfiltration_folder") + exfiltration_folder_name: str = Field(default="exfiltration_folder") """""" diff --git a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py index d273ea23..b21a996d 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py @@ -185,9 +185,9 @@ class AbstractC2(Application, identifier="AbstractC2"): If a FTPServer is not installed then this method will attempt to install one. :return: An FTPServer object is successful, else None - :rtype: union[FTPServer, None] + :rtype: Optional[FTPServer] """ - ftp_server: Union[FTPServer, None] = self.software_manager.software.get("FTPServer") + ftp_server: Optional[FTPServer] = self.software_manager.software.get("FTPServer") if ftp_server is None: self.sys_log.warning(f"{self.__class__.__name__}:No FTPServer installed. Attempting to install FTPServer.") self.software_manager.install(FTPServer) diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index e2393ff1..9c63bb53 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -359,8 +359,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): username=command_opts.username, password=command_opts.password, ip_address=command_opts.target_ip_address ): return RequestResponse( - status="failure", - data={"Reason": "Cannot create a terminal session. Are the credentials correct?"}, + status="failure", data={"Reason": "Cannot create a terminal session. Are the credentials correct?"} ) # Using the terminal to start the FTP Client on the remote machine. @@ -371,7 +370,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): local_ip = host_network_interfaces.get(next(iter(host_network_interfaces))).ip_address # Creating the FTP creation options. - exfil_opts = { + ftp_opts = { "dest_ip_address": str(local_ip), "src_folder_name": command_opts.target_folder_name, "src_file_name": command_opts.target_file_name, @@ -379,7 +378,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): "dest_file_name": command_opts.target_file_name, } - attempt_exfiltration: tuple[bool, RequestResponse] = self._perform_exfiltration(exfil_opts) + attempt_exfiltration: tuple[bool, RequestResponse] = self._perform_exfiltration(ftp_opts) if attempt_exfiltration[0] is False: self.sys_log.error(f"{self.name}: File Exfiltration Attempt Failed: {attempt_exfiltration[1].data}") @@ -397,7 +396,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): ) ) - def _perform_exfiltration(self, exfil_opts: Exfil_Opts) -> tuple[bool, RequestResponse]: + def _perform_exfiltration(self, ftp_opts: dict) -> tuple[bool, RequestResponse]: """ Attempts to exfiltrate a target file from a target using the parameters given. @@ -418,11 +417,11 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :rtype: tuple[bool, RequestResponse """ # Creating the exfiltration folder . - exfiltration_folder = self.get_exfiltration_folder(exfil_opts.get("dest_folder_name")) + exfiltration_folder = self.get_exfiltration_folder(ftp_opts.get("dest_folder_name")) # Using the terminal to send the target data back to the C2 Beacon. exfil_response: RequestResponse = RequestResponse.from_bool( - self.terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts]) + self.terminal_session.execute(command=["service", "FTPClient", "send", ftp_opts]) ) # Validating that we successfully received the target data. @@ -432,7 +431,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): return [False, exfil_response] # Target file: - target_file: str = exfil_opts.get("src_file_name") + target_file: str = ftp_opts.get("src_file_name") if exfiltration_folder.get_file(target_file) is None: self.sys_log.warning( diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index 9816bb15..8384d922 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -7,6 +7,12 @@ from pydantic import validate_call from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.protocols.masquerade import C2Packet +from primaite.simulator.system.applications.red_applications.c2 import ( + Command_Opts, + Exfil_Opts, + Ransomware_Opts, + Terminal_Opts, +) from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload @@ -49,11 +55,11 @@ class C2Server(AbstractC2, identifier="C2Server"): :return: RequestResponse object with a success code reflecting whether the configuration could be applied. :rtype: RequestResponse """ - ransomware_config = { + command_payload = { "server_ip_address": request[-1].get("server_ip_address"), "payload": request[-1].get("payload"), } - return self.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config) + return self.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=command_payload) def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse: """Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters. @@ -226,7 +232,9 @@ class C2Server(AbstractC2, identifier="C2Server"): if connection_status[0] is False: return connection_status[1] - if not self._command_setup(given_command, command_options): + setup_success, command_options = self._command_setup(given_command, command_options) + + if setup_success is False: self.sys_log.warning( f"{self.name}: Failed to perform necessary C2 Server setup for given command: {given_command}." ) @@ -236,7 +244,7 @@ class C2Server(AbstractC2, identifier="C2Server"): self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.") command_packet = self._craft_packet( - c2_payload=C2Payload.INPUT, c2_command=given_command, command_options=command_options + c2_payload=C2Payload.INPUT, c2_command=given_command, command_options=command_options.model_dump() ) if self.send( @@ -256,10 +264,12 @@ class C2Server(AbstractC2, identifier="C2Server"): ) return self.current_command_output - def _command_setup(self, given_command: C2Command, command_options: dict) -> bool: + def _command_setup(self, given_command: C2Command, command_options: dict) -> tuple[bool, Command_Opts]: """ Performs any necessary C2 Server setup needed to perform certain commands. + This includes any option validation and any other required setup. + The following table details any C2 Server prequisites for following commands. C2 Command | Command Service/Application Requirements @@ -278,18 +288,35 @@ class C2Server(AbstractC2, identifier="C2Server"): :type given_command: C2Command. :param command_options: The relevant command parameters. :type command_options: Dict - :returns: True the setup was successful, false otherwise. - :rtype: bool + :returns: Tuple containing a success bool if the setup was successful and the validated c2 opts. + :rtype: tuple[bool, Command_Opts] """ + server_setup_success: bool = True + if given_command == C2Command.DATA_EXFILTRATION: # Data exfiltration setup + # Validating command options + command_options = Exfil_Opts.model_validate(command_options) if self._host_ftp_server is None: self.sys_log.warning(f"{self.name}: Unable to setup the FTP Server for data exfiltration") - return False - if self.get_exfiltration_folder(command_options.get("exfiltration_folder_name")) is None: - self.sys_log.warning(f"{self.name}: Unable to create a folder for storing exfiltration data.") - return False + server_setup_success = False - return True + if self.get_exfiltration_folder(command_options.exfiltration_folder_name) is None: + self.sys_log.warning(f"{self.name}: Unable to create a folder for storing exfiltration data.") + server_setup_success = False + + if given_command == C2Command.TERMINAL: + # Validating command options + command_options = Terminal_Opts.model_validate(command_options) + + if given_command == C2Command.RANSOMWARE_CONFIGURE: + # Validating command options + command_options = Ransomware_Opts.model_validate(command_options) + + if given_command == C2Command.RANSOMWARE_LAUNCH: + # Validating command options + command_options = Command_Opts.model_validate(command_options) + + return [server_setup_success, command_options] def _confirm_remote_connection(self, timestep: int) -> bool: """Checks the suitability of the current C2 Beacon connection. diff --git a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py index 30defe8b..885a3cb6 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py @@ -219,3 +219,30 @@ def test_c2_handles_1_timestep_keep_alive(basic_c2_network): assert c2_beacon.c2_connection_active is True assert c2_server.c2_connection_active is True + + +def test_c2_exfil_folder(basic_c2_network): + """Tests that the C2 suite correctly default and setup their exfiltration_folders.""" + network: Network = basic_c2_network + + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + c2_beacon.get_exfiltration_folder() + c2_server.get_exfiltration_folder() + assert c2_beacon.file_system.get_folder("exfiltration_folder") + assert c2_server.file_system.get_folder("exfiltration_folder") + + c2_server.file_system.create_file(folder_name="test_folder", file_name="test_file") + + # asserting to check that by default the c2 exfil will use "exfiltration_folder" + exfil_options = { + "username": "admin", + "password": "admin", + "target_ip_address": "192.168.0.1", + "target_folder_name": "test_folder", + "exfiltration_folder_name": None, + "target_file_name": "test_file", + } + c2_server.send_command(given_command=C2Command.DATA_EXFILTRATION, command_options=exfil_options) + + assert c2_beacon.file_system.get_file(folder_name="exfiltration_folder", file_name="test_file")