#2689 Addressed failing tests + updated c2_suite.rst to include the Data exfil command.

This commit is contained in:
Archer Bowen
2024-08-15 14:41:35 +01:00
parent c50b005c37
commit f32b3a931f
5 changed files with 52 additions and 33 deletions

View File

@@ -34,6 +34,8 @@ Currently, the C2 Server offers three commands:
+---------------------+---------------------------------------------------------------------------+
|RANSOMWARE_LAUNCH | Launches the installed ransomware script. |
+---------------------+---------------------------------------------------------------------------+
|DATA_EXFILTRATION | Copies a target file from a remote node to the C2 Beacon & Server via FTP |
+---------------------+---------------------------------------------------------------------------+
|TERMINAL | Executes a command via the terminal installed on the C2 Beacons Host. |
+---------------------+---------------------------------------------------------------------------+
@@ -111,21 +113,28 @@ Python
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.applications.database_client import DatabaseClient
# Network Setup
switch = Switch(hostname="switch", start_up_duration=0, num_ports=4)
switch.power_on()
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
node_a.power_on()
node_a.software_manager.install(software_class=C2Server)
node_a.software_manager.get_open_ports()
network.connect(node_a.network_interface[1], switch.network_interface[1])
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
node_b.power_on()
node_b.software_manager.install(software_class=C2Beacon)
node_b.software_manager.install(software_class=RansomwareScript)
network.connect(node_a.network_interface[1], node_b.network_interface[1])
node_b.software_manager.install(software_class=DatabaseClient)
network.connect(node_b.network_interface[1], switch.network_interface[2])
node_c = Computer(hostname="node_c", ip_address="192.168.0.12", subnet_mask="255.255.255.0", start_up_duration=0)
node_c.power_on()
node_c.software_manager.install(software_class=DatabaseServer)
network.connect(node_c.network_interface[1], switch.network_interface[3])
# C2 Application objects
@@ -159,7 +168,7 @@ Python
c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
# Example commands: Installing and configuring Ransomware:
# Example command: Installing and configuring Ransomware:
ransomware_installation_command = { "commands": [
["software_manager","application","install","RansomwareScript"],
@@ -170,12 +179,31 @@ Python
}
c2_server.send_command(given_command=C2Command.TERMINAL, command_options=ransomware_config)
ransomware_config = {"server_ip_address": "192.168.0.10"}
ransomware_config = {"server_ip_address": "192.168.0.12"}
c2_server.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
c2_beacon_host.software_manager.show()
# Example command: File Exfiltration
data_exfil_options = {
"username": "admin",
"password": "admin",
"ip_address": None,
"target_ip_address": "192.168.0.12",
"target_file_name": "database.db"
"target_folder_name": "database"
"exfiltration_folder_name":
}
c2_server.send_command(given_command=C2Command.DATA_EXFILTRATION, command_options=data_exfil_options)
# Example command: Launching Ransomware
c2_server.send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options={})
Via Configuration
"""""""""""""""""

View File

@@ -18,20 +18,14 @@
"outputs": [],
"source": [
"# Imports\n",
"import yaml\n",
"from primaite.config.load import data_manipulation_config_path\n",
"from primaite.session.environment import PrimaiteGymEnv\n",
"from primaite.simulator.network.hardware.nodes.network.router import Router\n",
"from primaite.game.agent.interface import AgentHistoryItem\n",
"import yaml\n",
"from pprint import pprint\n",
"from primaite.simulator.network.container import Network\n",
"from primaite.game.game import PrimaiteGame\n",
"from primaite.simulator.system.applications.application import ApplicationOperatingState\n",
"from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon\n",
"from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server\n",
"from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import C2Command, C2Payload\n",
"from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import C2Command\n",
"from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript\n",
"from primaite.simulator.system.software import SoftwareHealthState\n",
"from primaite.simulator.network.hardware.nodes.host.computer import Computer\n",
"from primaite.simulator.network.hardware.nodes.host.server import Server"
]

View File

@@ -379,12 +379,11 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
"dest_file_name": command_opts.target_file_name,
}
# Lambda method used to return a failure RequestResponse if we're unable to perform the exfiltration.
# If _check_connection returns false then connection_status will return reason (A 'failure' Request Response)
if attempt_exfiltration := (lambda return_bool, reason: reason if return_bool is False else None)(
*self._perform_exfiltration(exfil_opts)
):
return attempt_exfiltration
attempt_exfiltration: tuple[bool, RequestResponse] = self._perform_exfiltration(exfil_opts)
if attempt_exfiltration[0] is False:
self.sys_log.error(f"{self.name}: File Exfiltration Attempt Failed: {attempt_exfiltration[1].data}")
return attempt_exfiltration[1]
# Sending the transferred target data back to the C2 Server to successfully exfiltrate the data out the network.
@@ -418,6 +417,9 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
:return: Returns a tuple containing a success boolean and a Request Response..
:rtype: tuple[bool, RequestResponse
"""
# Creating the exfiltration folder .
exfiltration_folder = self.get_exfiltration_folder(exfil_opts.get("dest_folder_name"))
# Using the terminal to send the target data back to the C2 Beacon.
exfil_response: RequestResponse = RequestResponse.from_bool(
self.terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts])
@@ -432,9 +434,6 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
# Target file:
target_file: str = exfil_opts.get("src_file_name")
# Creating the exfiltration folder .
exfiltration_folder = self.get_exfiltration_folder(exfil_opts.get("src_folder_name"))
if exfiltration_folder.get_file(target_file) is None:
self.sys_log.warning(
f"{self.name}: Unable to locate exfiltrated file on local filesystem. "

View File

@@ -221,12 +221,10 @@ class C2Server(AbstractC2, identifier="C2Server"):
status="failure", data={"Reason": "Received unexpected C2Command. Unable to send command."}
)
# Lambda method used to return a failure RequestResponse if we're unable to confirm a connection.
# If _check_connection returns false then connection_status will return reason (A 'failure' Request Response)
if connection_status := (lambda return_bool, reason: reason if return_bool is False else None)(
*self._check_connection()
):
return connection_status
connection_status: tuple[bool, RequestResponse] = self._check_connection()
if connection_status[0] is False:
return connection_status[1]
if not self._command_setup(given_command, command_options):
self.sys_log.warning(

View File

@@ -270,7 +270,7 @@ def test_c2_suite_terminal_command_file_creation(basic_network):
c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
assert computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
assert c2_beacon.local_terminal_session is not None
assert c2_beacon.terminal_session is not None
# Testing that we can create the same test file/folders via on node 3 via a remote terminal.
@@ -280,7 +280,7 @@ def test_c2_suite_terminal_command_file_creation(basic_network):
c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
assert computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
assert c2_beacon.remote_terminal_session is not None
assert c2_beacon.terminal_session is not None
def test_c2_suite_acl_bypass(basic_network):