#2689 Implemented pydantic model validation on C2 Server setup method + updated E2E notebook with data exfiltration.
This commit is contained in:
@@ -500,9 +500,9 @@
|
||||
"source": [
|
||||
"### **Command and Control** | C2 Server Actions | C2_SERVER_DATA_EXFILTRATE\n",
|
||||
"\n",
|
||||
"Finally, currently the last action available is the ``C2_SERVER_DATA_EXFILTRATE`` which can be used to exfiltrate a target_file on a remote node to the C2 Beacon & Server's host file system via the ``FTP`` services.\n",
|
||||
"The second to last action available is the ``C2_SERVER_DATA_EXFILTRATE`` which can be used to exfiltrate a target file on a remote node to the C2 Beacon & Server's host file system via the ``FTP`` services.\n",
|
||||
"\n",
|
||||
"This action is indexed as action ``9``. # TODO: Update.\n",
|
||||
"This action is indexed as action ``6``..\n",
|
||||
"\n",
|
||||
"The below yaml snippet shows all the relevant agent options for this action\n",
|
||||
"\n",
|
||||
@@ -651,9 +651,13 @@
|
||||
" applications:\n",
|
||||
" - application_name: C2Beacon\n",
|
||||
" - application_name: RansomwareScript\n",
|
||||
" folders:\n",
|
||||
" - folder_name: exfiltration_folder\n",
|
||||
" files:\n",
|
||||
" - file_name: database.db\n",
|
||||
" - hostname: database_server\n",
|
||||
" folders:\n",
|
||||
" - folder_name: database\n",
|
||||
" - folder_name: exfiltration_folder\n",
|
||||
" files:\n",
|
||||
" - file_name: database.db\n",
|
||||
" - hostname: client_1\n",
|
||||
@@ -663,7 +667,7 @@
|
||||
" num_folders: 1\n",
|
||||
" num_files: 1\n",
|
||||
" num_nics: 1\n",
|
||||
" include_num_access: false\n",
|
||||
" include_num_access: true\n",
|
||||
" include_nmne: false\n",
|
||||
" monitored_traffic:\n",
|
||||
" icmp:\n",
|
||||
@@ -832,7 +836,14 @@
|
||||
"source": [
|
||||
"### **Command and Control** | Blue Agent Relevance | Observation Space\n",
|
||||
"\n",
|
||||
"This section demonstrates the OBS impact if the C2 suite is successfully installed and then used to install, configure and launch the ransomwarescript."
|
||||
"This section demonstrates the impacts that each of that the C2 Beacon and the C2 Server's commands cause on the observation space (OBS)."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### **Command and Control** | OBS Impact | C2 Beacon | Installation & Configuration"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -888,6 +899,19 @@
|
||||
"display_obs_diffs(default_obs, c2_configuration_obs, blue_env.game.step_counter)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### **Command and Control** | OBS Impact | C2 Server | Terminal Command\n",
|
||||
"\n",
|
||||
"Using the C2 Server's ``TERMINAL`` command it is possible to install a ``RansomwareScript`` application onto the C2 Beacon's host.\n",
|
||||
"\n",
|
||||
"The below code cells perform this as well as capturing the OBS impacts.\n",
|
||||
"\n",
|
||||
"It's important to note that the ``TERMINAL`` command is not limited to just installing software."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
@@ -922,11 +946,22 @@
|
||||
"c2_ransomware_obs, _, _, _, _ = blue_env.step(0)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"display_obs_diffs(default_obs, c2_ransomware_obs, env.game.step_counter)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The code cell below demonstrates the differences between the default observation space and the configuration of the C2 Server and the Ransomware installation."
|
||||
"#### **Command and Control** | OBS Impact | C2 Server | Data Exfiltration\n",
|
||||
"\n",
|
||||
"Before encrypting the database.db file, the ``DATA_EXFILTRATION`` command can be used to copy the database.db file onto both the C2 Server and the C2 Beacon's file systems:"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -935,7 +970,61 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"display_obs_diffs(default_obs, c2_ransomware_obs, env.game.step_counter)"
|
||||
"exfil_options={\n",
|
||||
" \"username\": \"admin\",\n",
|
||||
" \"password\": \"admin\",\n",
|
||||
" \"target_ip_address\": \"192.168.1.14\",\n",
|
||||
" \"target_folder_name\": \"database\",\n",
|
||||
" \"exfiltration_folder_name\": \"exfiltration_folder\",\n",
|
||||
" \"target_file_name\": \"database.db\",\n",
|
||||
"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"c2_server.send_command(given_command=C2Command.DATA_EXFILTRATION, command_options=exfil_options)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"c2_exfil_obs, _, _, _, _ = blue_env.step(0)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"display_obs_diffs(c2_ransomware_obs, c2_exfil_obs, env.game.step_counter)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### **Command and Control** | OBS Impact | C2 Server | Ransomware Commands\n",
|
||||
"\n",
|
||||
"The code cell below demonstrates the differences between the ransomware script installation obs and the impact of RansomwareScript upon the database."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Configuring the RansomwareScript\n",
|
||||
"ransomware_config = {\"server_ip_address\": \"192.168.1.14\", \"payload\": \"ENCRYPT\"}\n",
|
||||
"c2_server.send_command(C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -959,13 +1048,6 @@
|
||||
"c2_final_obs, _, _, _, _ = blue_env.step(0)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The code cell below demonstrates the differences between the default observation space and the configuration of the C2 Server, the ransomware script installation as well as the impact of RansomwareScript upon the database."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
from typing import Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, field_validator, ValidationInfo
|
||||
|
||||
from primaite.interface.request import RequestFormat
|
||||
|
||||
@@ -9,6 +9,14 @@ from primaite.interface.request import RequestFormat
|
||||
class Command_Opts(BaseModel):
|
||||
"""A C2 Pydantic Schema acting as a base class for all C2 Commands."""
|
||||
|
||||
@field_validator("payload", "exfiltration_folder_name", "ip_address", mode="before", check_fields=False)
|
||||
@classmethod
|
||||
def not_none(cls, v: str, info: ValidationInfo) -> int:
|
||||
"""If None is passed, use the default value instead."""
|
||||
if v is None:
|
||||
return cls.model_fields[info.field_name].default
|
||||
return v
|
||||
|
||||
|
||||
class Ransomware_Opts(Command_Opts):
|
||||
"""A Pydantic Schema for the Ransomware Configuration command options."""
|
||||
@@ -16,7 +24,7 @@ class Ransomware_Opts(Command_Opts):
|
||||
server_ip_address: str
|
||||
"""The IP Address of the target database that the RansomwareScript will attack."""
|
||||
|
||||
payload: Optional[str] = Field(default="ENCRYPT")
|
||||
payload: str = Field(default="ENCRYPT")
|
||||
"""The malicious payload to be used to attack the target database."""
|
||||
|
||||
|
||||
@@ -45,7 +53,7 @@ class Exfil_Opts(Remote_Opts):
|
||||
target_folder_name: str
|
||||
"""The name of the remote folder which contains the target file."""
|
||||
|
||||
exfiltration_folder_name: Optional[str] = Field(default="exfiltration_folder")
|
||||
exfiltration_folder_name: str = Field(default="exfiltration_folder")
|
||||
""""""
|
||||
|
||||
|
||||
|
||||
@@ -185,9 +185,9 @@ class AbstractC2(Application, identifier="AbstractC2"):
|
||||
If a FTPServer is not installed then this method will attempt to install one.
|
||||
|
||||
:return: An FTPServer object is successful, else None
|
||||
:rtype: union[FTPServer, None]
|
||||
:rtype: Optional[FTPServer]
|
||||
"""
|
||||
ftp_server: Union[FTPServer, None] = self.software_manager.software.get("FTPServer")
|
||||
ftp_server: Optional[FTPServer] = self.software_manager.software.get("FTPServer")
|
||||
if ftp_server is None:
|
||||
self.sys_log.warning(f"{self.__class__.__name__}:No FTPServer installed. Attempting to install FTPServer.")
|
||||
self.software_manager.install(FTPServer)
|
||||
|
||||
@@ -359,8 +359,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
username=command_opts.username, password=command_opts.password, ip_address=command_opts.target_ip_address
|
||||
):
|
||||
return RequestResponse(
|
||||
status="failure",
|
||||
data={"Reason": "Cannot create a terminal session. Are the credentials correct?"},
|
||||
status="failure", data={"Reason": "Cannot create a terminal session. Are the credentials correct?"}
|
||||
)
|
||||
|
||||
# Using the terminal to start the FTP Client on the remote machine.
|
||||
@@ -371,7 +370,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
local_ip = host_network_interfaces.get(next(iter(host_network_interfaces))).ip_address
|
||||
|
||||
# Creating the FTP creation options.
|
||||
exfil_opts = {
|
||||
ftp_opts = {
|
||||
"dest_ip_address": str(local_ip),
|
||||
"src_folder_name": command_opts.target_folder_name,
|
||||
"src_file_name": command_opts.target_file_name,
|
||||
@@ -379,7 +378,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
"dest_file_name": command_opts.target_file_name,
|
||||
}
|
||||
|
||||
attempt_exfiltration: tuple[bool, RequestResponse] = self._perform_exfiltration(exfil_opts)
|
||||
attempt_exfiltration: tuple[bool, RequestResponse] = self._perform_exfiltration(ftp_opts)
|
||||
|
||||
if attempt_exfiltration[0] is False:
|
||||
self.sys_log.error(f"{self.name}: File Exfiltration Attempt Failed: {attempt_exfiltration[1].data}")
|
||||
@@ -397,7 +396,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
)
|
||||
)
|
||||
|
||||
def _perform_exfiltration(self, exfil_opts: Exfil_Opts) -> tuple[bool, RequestResponse]:
|
||||
def _perform_exfiltration(self, ftp_opts: dict) -> tuple[bool, RequestResponse]:
|
||||
"""
|
||||
Attempts to exfiltrate a target file from a target using the parameters given.
|
||||
|
||||
@@ -418,11 +417,11 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
:rtype: tuple[bool, RequestResponse
|
||||
"""
|
||||
# Creating the exfiltration folder .
|
||||
exfiltration_folder = self.get_exfiltration_folder(exfil_opts.get("dest_folder_name"))
|
||||
exfiltration_folder = self.get_exfiltration_folder(ftp_opts.get("dest_folder_name"))
|
||||
|
||||
# Using the terminal to send the target data back to the C2 Beacon.
|
||||
exfil_response: RequestResponse = RequestResponse.from_bool(
|
||||
self.terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts])
|
||||
self.terminal_session.execute(command=["service", "FTPClient", "send", ftp_opts])
|
||||
)
|
||||
|
||||
# Validating that we successfully received the target data.
|
||||
@@ -432,7 +431,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
return [False, exfil_response]
|
||||
|
||||
# Target file:
|
||||
target_file: str = exfil_opts.get("src_file_name")
|
||||
target_file: str = ftp_opts.get("src_file_name")
|
||||
|
||||
if exfiltration_folder.get_file(target_file) is None:
|
||||
self.sys_log.warning(
|
||||
|
||||
@@ -7,6 +7,12 @@ from pydantic import validate_call
|
||||
from primaite.interface.request import RequestFormat, RequestResponse
|
||||
from primaite.simulator.core import RequestManager, RequestType
|
||||
from primaite.simulator.network.protocols.masquerade import C2Packet
|
||||
from primaite.simulator.system.applications.red_applications.c2 import (
|
||||
Command_Opts,
|
||||
Exfil_Opts,
|
||||
Ransomware_Opts,
|
||||
Terminal_Opts,
|
||||
)
|
||||
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
|
||||
|
||||
|
||||
@@ -49,11 +55,11 @@ class C2Server(AbstractC2, identifier="C2Server"):
|
||||
:return: RequestResponse object with a success code reflecting whether the configuration could be applied.
|
||||
:rtype: RequestResponse
|
||||
"""
|
||||
ransomware_config = {
|
||||
command_payload = {
|
||||
"server_ip_address": request[-1].get("server_ip_address"),
|
||||
"payload": request[-1].get("payload"),
|
||||
}
|
||||
return self.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
|
||||
return self.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=command_payload)
|
||||
|
||||
def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
"""Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters.
|
||||
@@ -226,7 +232,9 @@ class C2Server(AbstractC2, identifier="C2Server"):
|
||||
if connection_status[0] is False:
|
||||
return connection_status[1]
|
||||
|
||||
if not self._command_setup(given_command, command_options):
|
||||
setup_success, command_options = self._command_setup(given_command, command_options)
|
||||
|
||||
if setup_success is False:
|
||||
self.sys_log.warning(
|
||||
f"{self.name}: Failed to perform necessary C2 Server setup for given command: {given_command}."
|
||||
)
|
||||
@@ -236,7 +244,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
|
||||
|
||||
self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.")
|
||||
command_packet = self._craft_packet(
|
||||
c2_payload=C2Payload.INPUT, c2_command=given_command, command_options=command_options
|
||||
c2_payload=C2Payload.INPUT, c2_command=given_command, command_options=command_options.model_dump()
|
||||
)
|
||||
|
||||
if self.send(
|
||||
@@ -256,10 +264,12 @@ class C2Server(AbstractC2, identifier="C2Server"):
|
||||
)
|
||||
return self.current_command_output
|
||||
|
||||
def _command_setup(self, given_command: C2Command, command_options: dict) -> bool:
|
||||
def _command_setup(self, given_command: C2Command, command_options: dict) -> tuple[bool, Command_Opts]:
|
||||
"""
|
||||
Performs any necessary C2 Server setup needed to perform certain commands.
|
||||
|
||||
This includes any option validation and any other required setup.
|
||||
|
||||
The following table details any C2 Server prequisites for following commands.
|
||||
|
||||
C2 Command | Command Service/Application Requirements
|
||||
@@ -278,18 +288,35 @@ class C2Server(AbstractC2, identifier="C2Server"):
|
||||
:type given_command: C2Command.
|
||||
:param command_options: The relevant command parameters.
|
||||
:type command_options: Dict
|
||||
:returns: True the setup was successful, false otherwise.
|
||||
:rtype: bool
|
||||
:returns: Tuple containing a success bool if the setup was successful and the validated c2 opts.
|
||||
:rtype: tuple[bool, Command_Opts]
|
||||
"""
|
||||
server_setup_success: bool = True
|
||||
|
||||
if given_command == C2Command.DATA_EXFILTRATION: # Data exfiltration setup
|
||||
# Validating command options
|
||||
command_options = Exfil_Opts.model_validate(command_options)
|
||||
if self._host_ftp_server is None:
|
||||
self.sys_log.warning(f"{self.name}: Unable to setup the FTP Server for data exfiltration")
|
||||
return False
|
||||
if self.get_exfiltration_folder(command_options.get("exfiltration_folder_name")) is None:
|
||||
self.sys_log.warning(f"{self.name}: Unable to create a folder for storing exfiltration data.")
|
||||
return False
|
||||
server_setup_success = False
|
||||
|
||||
return True
|
||||
if self.get_exfiltration_folder(command_options.exfiltration_folder_name) is None:
|
||||
self.sys_log.warning(f"{self.name}: Unable to create a folder for storing exfiltration data.")
|
||||
server_setup_success = False
|
||||
|
||||
if given_command == C2Command.TERMINAL:
|
||||
# Validating command options
|
||||
command_options = Terminal_Opts.model_validate(command_options)
|
||||
|
||||
if given_command == C2Command.RANSOMWARE_CONFIGURE:
|
||||
# Validating command options
|
||||
command_options = Ransomware_Opts.model_validate(command_options)
|
||||
|
||||
if given_command == C2Command.RANSOMWARE_LAUNCH:
|
||||
# Validating command options
|
||||
command_options = Command_Opts.model_validate(command_options)
|
||||
|
||||
return [server_setup_success, command_options]
|
||||
|
||||
def _confirm_remote_connection(self, timestep: int) -> bool:
|
||||
"""Checks the suitability of the current C2 Beacon connection.
|
||||
|
||||
@@ -219,3 +219,30 @@ def test_c2_handles_1_timestep_keep_alive(basic_c2_network):
|
||||
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
|
||||
def test_c2_exfil_folder(basic_c2_network):
|
||||
"""Tests that the C2 suite correctly default and setup their exfiltration_folders."""
|
||||
network: Network = basic_c2_network
|
||||
|
||||
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
|
||||
|
||||
c2_beacon.get_exfiltration_folder()
|
||||
c2_server.get_exfiltration_folder()
|
||||
assert c2_beacon.file_system.get_folder("exfiltration_folder")
|
||||
assert c2_server.file_system.get_folder("exfiltration_folder")
|
||||
|
||||
c2_server.file_system.create_file(folder_name="test_folder", file_name="test_file")
|
||||
|
||||
# asserting to check that by default the c2 exfil will use "exfiltration_folder"
|
||||
exfil_options = {
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"target_ip_address": "192.168.0.1",
|
||||
"target_folder_name": "test_folder",
|
||||
"exfiltration_folder_name": None,
|
||||
"target_file_name": "test_file",
|
||||
}
|
||||
c2_server.send_command(given_command=C2Command.DATA_EXFILTRATION, command_options=exfil_options)
|
||||
|
||||
assert c2_beacon.file_system.get_file(folder_name="exfiltration_folder", file_name="test_file")
|
||||
|
||||
Reference in New Issue
Block a user