#2689 Fixed small bugs, added pydantic class validation and divided the data_Exfil command on c2 beacon into two separate methods.

This commit is contained in:
Archer Bowen
2024-08-15 11:36:55 +01:00
parent 6a28f17f1b
commit e53ac84666
6 changed files with 138 additions and 74 deletions

View File

@@ -1195,7 +1195,7 @@ class TerminalC2ServerAction(AbstractAction):
class _Opts(BaseModel):
"""Schema for options that can be passed to this action."""
commands: List[RequestFormat]
commands: Union[List[RequestFormat], RequestFormat]
ip_address: Optional[str]
username: Optional[str]
password: Optional[str]

View File

@@ -1275,13 +1275,18 @@ class UserSessionManager(Service):
def pre_timestep(self, timestep: int) -> None:
"""Apply any pre-timestep logic that helps make sure we have the correct observations."""
self.current_timestep = timestep
inactive_sessions: list = []
if self.local_session:
if self.local_session.last_active_step + self.local_session_timeout_steps <= timestep:
self._timeout_session(self.local_session)
inactive_sessions.append(self.local_session)
for session in self.remote_sessions:
remote_session = self.remote_sessions[session]
if remote_session.last_active_step + self.remote_session_timeout_steps <= timestep:
self._timeout_session(remote_session)
inactive_sessions.append(remote_session)
for sessions in inactive_sessions:
self._timeout_session(sessions)
def _timeout_session(self, session: UserSession) -> None:
"""

View File

@@ -1 +1,56 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Optional, Union
from pydantic import BaseModel, Field
from primaite.interface.request import RequestFormat
class Command_Opts(BaseModel):
"""A C2 Pydantic Schema acting as a base class for all C2 Commands."""
class Ransomware_Opts(Command_Opts):
"""A Pydantic Schema for the Ransomware Configuration command options."""
server_ip_address: str
""""""
payload: Optional[str] = Field(default="ENCRYPT")
""""""
class Remote_Opts(Command_Opts):
"""A base C2 Pydantic Schema for all C2 Commands that require a remote terminal connection."""
ip_address: Optional[str] = Field(default=None)
""""""
username: str
""""""
password: str
""""""
class Exfil_Opts(Remote_Opts):
"""A Pydantic Schema for the C2 Data Exfiltration command options."""
target_ip_address: str
""""""
target_folder_name: str
""""""
target_file_name: str
""""""
exfiltration_folder_name: Optional[str] = Field(default="exfiltration_folder")
""""""
class Terminal_Opts(Remote_Opts):
"""A Pydantic Schema for the C2 Terminal command options."""
commands: Union[list[RequestFormat], RequestFormat]
""""""

View File

@@ -147,7 +147,6 @@ class AbstractC2(Application, identifier="AbstractC2"):
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
# TODO: We may need to disable the ftp_server/client when using the opposite service. (To test)
@property
def _host_ftp_client(self) -> Optional[FTPClient]:
"""Return the FTPClient that is installed C2 Application's host.
@@ -214,8 +213,10 @@ class AbstractC2(Application, identifier="AbstractC2"):
self.sys_log.error(f"{self.__class__.__name__}: does not seem to have a file system!")
return host_file_system
def get_exfiltration_folder(self, folder_name: str) -> Optional[Folder]:
def get_exfiltration_folder(self, folder_name: Optional[str] = "exfiltration_folder") -> Optional[Folder]:
"""Return a folder used for storing exfiltrated data. Otherwise returns None."""
if self._host_file_system is None:
return
exfiltration_folder: Union[Folder, None] = self._host_file_system.get_folder(folder_name)
if exfiltration_folder is None:
self.sys_log.info(f"{self.__class__.__name__}: Creating a exfiltration folder.")

View File

@@ -11,6 +11,7 @@ from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.protocols.masquerade import C2Packet
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.red_applications.c2 import Exfil_Opts, Ransomware_Opts, Terminal_Opts
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.terminal.terminal import (
@@ -305,7 +306,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
"""
given_config = payload.payload
command_opts = Ransomware_Opts.model_validate(payload.payload)
if self._host_ransomware_script is None:
return RequestResponse(
status="failure",
@@ -313,7 +314,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
)
return RequestResponse.from_bool(
self._host_ransomware_script.configure(
server_ip_address=given_config["server_ip_address"], payload=given_config["payload"]
server_ip_address=command_opts.server_ip_address, payload=command_opts.payload
)
)
@@ -342,10 +343,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
Uses the FTP Client & Server services to perform the data exfiltration.
Similar to the terminal, the logic of this command is dependant on if a remote_ip
is present within the payload.
If a payload does contain an IP address then the C2 Beacon will ssh into the target ip
This command instructs the C2 Beacon to ssh into the target ip
and execute a command which causes the FTPClient service to send a
target file will be moved from the target IP address onto the C2 Beacon's host
@@ -366,67 +364,74 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
data={"Reason": "Cannot find any instances of both a FTP Server & Client. Are they installed?"},
)
remote_ip = payload.payload.get("target_ip_address")
target_folder = payload.payload.get("target_folder_name")
dest_folder = payload.payload.get("exfiltration_folder_name")
# Using the same name for both the target/destination file for clarity.
file_name = payload.payload.get("target_file_name")
# TODO: Split Remote file extraction and local file extraction into different methods.
# if remote_ip is None:
# self._host_ftp_client.start()
# self.sys_log.info(f"{self.name}: No Remote IP given. Returning target file from local file system.")
# return RequestResponse.from_bool(self._host_ftp_client.send_file(
# dest_ip_address=self.c2_remote_connection,
# src_folder_name=target_folder,
# src_file_name=file_name,
# dest_folder_name=dest_folder,
# dest_file_name=file_name,
# session_id=self.c2_session.uuid
# ))
# Parsing remote login credentials
given_username = payload.payload.get("username")
given_password = payload.payload.get("password")
command_opts = Exfil_Opts.model_validate(payload.payload)
# Setting up the terminal session and the ftp server
terminal_session = self.get_remote_terminal_session(
username=given_username, password=given_password, ip_address=remote_ip
username=command_opts.username, password=command_opts.password, ip_address=command_opts.target_ip_address
)
# Initialising the exfiltration folder.
exfiltration_folder = self.get_exfiltration_folder(dest_folder)
# Using the terminal to start the FTP Client on the remote machine.
# This can fail if the FTP Client is already enabled.
terminal_session.execute(command=["service", "start", "FTPClient"])
# Need to supply to the FTP Client the C2 Beacon's host IP.
host_network_interfaces = self.software_manager.node.network_interfaces
local_ip = host_network_interfaces.get(next(iter(host_network_interfaces))).ip_address
# Creating the FTP creation options.
remote_ftp_options = {
exfil_opts = {
"dest_ip_address": str(local_ip),
"src_folder_name": target_folder,
"src_file_name": file_name,
"dest_folder_name": dest_folder,
"dest_file_name": file_name,
"src_folder_name": command_opts.target_folder_name,
"src_file_name": command_opts.target_file_name,
"dest_folder_name": command_opts.exfiltration_folder_name,
"dest_file_name": command_opts.target_file_name,
}
# Lambda method used to return a failure RequestResponse if we're unable to perform the exfiltration.
# If _check_connection returns false then connection_status will return reason (A 'failure' Request Response)
if attempt_exfiltration := (lambda return_bool, reason: reason if return_bool is False else None)(
*self._perform_target_exfiltration(exfil_opts, terminal_session)
):
return attempt_exfiltration
# Sending the transferred target data back to the C2 Server to successfully exfiltrate the data out the network.
return RequestResponse.from_bool(
self._host_ftp_client.send_file(
dest_ip_address=self.c2_remote_connection,
src_folder_name=command_opts.exfiltration_folder_name, # The Exfil folder is inherited attribute.
src_file_name=command_opts.target_file_name,
dest_folder_name=command_opts.exfiltration_folder_name,
dest_file_name=command_opts.target_file_name,
)
)
def _perform_target_exfiltration(
self, exfil_opts: dict, terminal_session: RemoteTerminalConnection
) -> tuple[bool, RequestResponse]:
"""Confirms that the target data is currently present within the C2 Beacon's hosts file system."""
# Using the terminal to send the target data back to the C2 Beacon.
remote_ftp_response: RequestResponse = RequestResponse.from_bool(
terminal_session.execute(command=["service", "FTPClient", "send", remote_ftp_options])
exfil_response: RequestResponse = RequestResponse.from_bool(
terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts])
)
# Validating that we successfully received the target data.
if remote_ftp_response.status == "failure":
self.sys_log.warning(
f"{self.name}: Remote connection: {remote_ip} failed to transfer the target data via FTP."
)
return remote_ftp_response
if exfil_response.status == "failure":
self.sys_log.warning(f"{self.name}: Remote connection failure. failed to transfer the target data via FTP.")
return [False, exfil_response]
if exfiltration_folder.get_file(file_name) is None:
self.sys_log.warning(f"{self.name}: Unable to locate exfiltrated file on local filesystem.")
# Target file:
target_file: str = exfil_opts.get("src_file_name")
# Creating the exfiltration folder .
exfiltration_folder = self.get_exfiltration_folder(exfil_opts.get("src_folder_name"))
if exfiltration_folder.get_file(target_file) is None:
self.sys_log.warning(
f"{self.name}: Unable to locate exfiltrated file on local filesystem."
f"Perhaps the file transfer failed?"
)
return RequestResponse(
status="failure", data={"reason": "Unable to locate exfiltrated data on file system."}
)
@@ -438,16 +443,13 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
data={"Reason": "Cannot find any instances of both a FTP Server & Client. Are they installed?"},
)
# Sending the transferred target data back to the C2 Server to successfully exfiltrate the data out the network.
return RequestResponse.from_bool(
self._host_ftp_client.send_file(
dest_ip_address=self.c2_remote_connection,
src_folder_name=dest_folder, # TODO: Clarify this - Dest folder has the same name on c2server/c2beacon.
src_file_name=file_name,
dest_folder_name=dest_folder,
dest_file_name=file_name,
)
)
return [
True,
RequestResponse(
status="success",
data={"Reason": "Located the target file on local file system. Data exfiltration successful."},
),
]
def _command_terminal(self, payload: C2Packet) -> RequestResponse:
"""
@@ -461,8 +463,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
"""
terminal_output: Dict[int, RequestResponse] = {}
given_commands: list[RequestFormat]
command_opts = Terminal_Opts.model_validate(payload.payload)
if self._host_terminal is None:
return RequestResponse(
@@ -470,17 +471,14 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
data={"Reason": "Host does not seem to have terminal installed. Unable to resolve command."},
)
given_commands = payload.payload.get("commands")
given_username = payload.payload.get("username")
given_password = payload.payload.get("password")
remote_ip = payload.payload.get("ip_address")
terminal_output: Dict[int, RequestResponse] = {}
# Creating a remote terminal session if given an IP Address, otherwise using a local terminal session.
if remote_ip is None:
terminal_session = self.get_terminal_session(username=given_username, password=given_password)
if command_opts.ip_address is None:
terminal_session = self.get_terminal_session(username=command_opts.username, password=command_opts.password)
else:
terminal_session = self.get_remote_terminal_session(
username=given_username, password=given_password, ip_address=remote_ip
username=command_opts.username, password=command_opts.password, ip_address=command_opts.ip_address
)
if terminal_session is None:
@@ -488,7 +486,12 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
status="failure", data={"reason": "Terminal Login failed. Cannot create a terminal session."}
)
for index, given_command in enumerate(given_commands):
# Converts a singular terminal command: [RequestFormat] into a list with one element [[RequestFormat]]
command_opts.commands = (
[command_opts.commands] if not isinstance(command_opts.commands, list) else command_opts.commands
)
for index, given_command in enumerate(command_opts.commands):
# A try catch exception ladder was used but was considered not the best approach
# as it can end up obscuring visibility of actual bugs (Not the expected ones) and was a temporary solution.
# TODO: Refactor + add further validation to ensure that a request is correct. (maybe a pydantic method?)

View File

@@ -287,7 +287,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
if self._host_ftp_server is None:
self.sys_log.warning(f"{self.name}: Unable to setup the FTP Server for data exfiltration")
return False
if not self.get_exfiltration_folder(command_options.get("exfiltration_folder_name", "exfil")):
if self.get_exfiltration_folder(command_options.get("exfiltration_folder_name")) is None:
self.sys_log.warning(f"{self.name}: Unable to create a folder for storing exfiltration data.")
return False