diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index aa74399e..07bb039e 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -1195,7 +1195,7 @@ class TerminalC2ServerAction(AbstractAction): class _Opts(BaseModel): """Schema for options that can be passed to this action.""" - commands: List[RequestFormat] + commands: Union[List[RequestFormat], RequestFormat] ip_address: Optional[str] username: Optional[str] password: Optional[str] diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 1441c93b..08164f22 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -1275,13 +1275,18 @@ class UserSessionManager(Service): def pre_timestep(self, timestep: int) -> None: """Apply any pre-timestep logic that helps make sure we have the correct observations.""" self.current_timestep = timestep + inactive_sessions: list = [] if self.local_session: if self.local_session.last_active_step + self.local_session_timeout_steps <= timestep: - self._timeout_session(self.local_session) + inactive_sessions.append(self.local_session) + for session in self.remote_sessions: remote_session = self.remote_sessions[session] if remote_session.last_active_step + self.remote_session_timeout_steps <= timestep: - self._timeout_session(remote_session) + inactive_sessions.append(remote_session) + + for sessions in inactive_sessions: + self._timeout_session(sessions) def _timeout_session(self, session: UserSession) -> None: """ diff --git a/src/primaite/simulator/system/applications/red_applications/c2/__init__.py b/src/primaite/simulator/system/applications/red_applications/c2/__init__.py index be6c00e7..97923284 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/__init__.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/__init__.py @@ -1 +1,56 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from typing import Optional, Union + +from pydantic import BaseModel, Field + +from primaite.interface.request import RequestFormat + + +class Command_Opts(BaseModel): + """A C2 Pydantic Schema acting as a base class for all C2 Commands.""" + + +class Ransomware_Opts(Command_Opts): + """A Pydantic Schema for the Ransomware Configuration command options.""" + + server_ip_address: str + """""" + + payload: Optional[str] = Field(default="ENCRYPT") + """""" + + +class Remote_Opts(Command_Opts): + """A base C2 Pydantic Schema for all C2 Commands that require a remote terminal connection.""" + + ip_address: Optional[str] = Field(default=None) + """""" + + username: str + """""" + + password: str + """""" + + +class Exfil_Opts(Remote_Opts): + """A Pydantic Schema for the C2 Data Exfiltration command options.""" + + target_ip_address: str + """""" + + target_folder_name: str + """""" + + target_file_name: str + """""" + + exfiltration_folder_name: Optional[str] = Field(default="exfiltration_folder") + """""" + + +class Terminal_Opts(Remote_Opts): + """A Pydantic Schema for the C2 Terminal command options.""" + + commands: Union[list[RequestFormat], RequestFormat] + """""" diff --git a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py index 6fa34fd6..d273ea23 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py @@ -147,7 +147,6 @@ class AbstractC2(Application, identifier="AbstractC2"): kwargs["protocol"] = IPProtocol.TCP super().__init__(**kwargs) - # TODO: We may need to disable the ftp_server/client when using the opposite service. (To test) @property def _host_ftp_client(self) -> Optional[FTPClient]: """Return the FTPClient that is installed C2 Application's host. @@ -214,8 +213,10 @@ class AbstractC2(Application, identifier="AbstractC2"): self.sys_log.error(f"{self.__class__.__name__}: does not seem to have a file system!") return host_file_system - def get_exfiltration_folder(self, folder_name: str) -> Optional[Folder]: + def get_exfiltration_folder(self, folder_name: Optional[str] = "exfiltration_folder") -> Optional[Folder]: """Return a folder used for storing exfiltrated data. Otherwise returns None.""" + if self._host_file_system is None: + return exfiltration_folder: Union[Folder, None] = self._host_file_system.get_folder(folder_name) if exfiltration_folder is None: self.sys_log.info(f"{self.__class__.__name__}: Creating a exfiltration folder.") diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index b3bf1902..47e4f902 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -11,6 +11,7 @@ from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.protocols.masquerade import C2Packet from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.red_applications.c2 import Exfil_Opts, Ransomware_Opts, Terminal_Opts from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.services.terminal.terminal import ( @@ -305,7 +306,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ - given_config = payload.payload + command_opts = Ransomware_Opts.model_validate(payload.payload) if self._host_ransomware_script is None: return RequestResponse( status="failure", @@ -313,7 +314,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): ) return RequestResponse.from_bool( self._host_ransomware_script.configure( - server_ip_address=given_config["server_ip_address"], payload=given_config["payload"] + server_ip_address=command_opts.server_ip_address, payload=command_opts.payload ) ) @@ -342,10 +343,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): Uses the FTP Client & Server services to perform the data exfiltration. - Similar to the terminal, the logic of this command is dependant on if a remote_ip - is present within the payload. - - If a payload does contain an IP address then the C2 Beacon will ssh into the target ip + This command instructs the C2 Beacon to ssh into the target ip and execute a command which causes the FTPClient service to send a target file will be moved from the target IP address onto the C2 Beacon's host @@ -366,67 +364,74 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): data={"Reason": "Cannot find any instances of both a FTP Server & Client. Are they installed?"}, ) - remote_ip = payload.payload.get("target_ip_address") - target_folder = payload.payload.get("target_folder_name") - dest_folder = payload.payload.get("exfiltration_folder_name") - - # Using the same name for both the target/destination file for clarity. - file_name = payload.payload.get("target_file_name") - - # TODO: Split Remote file extraction and local file extraction into different methods. - # if remote_ip is None: - # self._host_ftp_client.start() - # self.sys_log.info(f"{self.name}: No Remote IP given. Returning target file from local file system.") - # return RequestResponse.from_bool(self._host_ftp_client.send_file( - # dest_ip_address=self.c2_remote_connection, - # src_folder_name=target_folder, - # src_file_name=file_name, - # dest_folder_name=dest_folder, - # dest_file_name=file_name, - # session_id=self.c2_session.uuid - # )) - - # Parsing remote login credentials - given_username = payload.payload.get("username") - given_password = payload.payload.get("password") + command_opts = Exfil_Opts.model_validate(payload.payload) # Setting up the terminal session and the ftp server terminal_session = self.get_remote_terminal_session( - username=given_username, password=given_password, ip_address=remote_ip + username=command_opts.username, password=command_opts.password, ip_address=command_opts.target_ip_address ) - # Initialising the exfiltration folder. - exfiltration_folder = self.get_exfiltration_folder(dest_folder) - # Using the terminal to start the FTP Client on the remote machine. - # This can fail if the FTP Client is already enabled. terminal_session.execute(command=["service", "start", "FTPClient"]) + + # Need to supply to the FTP Client the C2 Beacon's host IP. host_network_interfaces = self.software_manager.node.network_interfaces local_ip = host_network_interfaces.get(next(iter(host_network_interfaces))).ip_address + # Creating the FTP creation options. - remote_ftp_options = { + exfil_opts = { "dest_ip_address": str(local_ip), - "src_folder_name": target_folder, - "src_file_name": file_name, - "dest_folder_name": dest_folder, - "dest_file_name": file_name, + "src_folder_name": command_opts.target_folder_name, + "src_file_name": command_opts.target_file_name, + "dest_folder_name": command_opts.exfiltration_folder_name, + "dest_file_name": command_opts.target_file_name, } + # Lambda method used to return a failure RequestResponse if we're unable to perform the exfiltration. + # If _check_connection returns false then connection_status will return reason (A 'failure' Request Response) + if attempt_exfiltration := (lambda return_bool, reason: reason if return_bool is False else None)( + *self._perform_target_exfiltration(exfil_opts, terminal_session) + ): + return attempt_exfiltration + + # Sending the transferred target data back to the C2 Server to successfully exfiltrate the data out the network. + + return RequestResponse.from_bool( + self._host_ftp_client.send_file( + dest_ip_address=self.c2_remote_connection, + src_folder_name=command_opts.exfiltration_folder_name, # The Exfil folder is inherited attribute. + src_file_name=command_opts.target_file_name, + dest_folder_name=command_opts.exfiltration_folder_name, + dest_file_name=command_opts.target_file_name, + ) + ) + + def _perform_target_exfiltration( + self, exfil_opts: dict, terminal_session: RemoteTerminalConnection + ) -> tuple[bool, RequestResponse]: + """Confirms that the target data is currently present within the C2 Beacon's hosts file system.""" # Using the terminal to send the target data back to the C2 Beacon. - remote_ftp_response: RequestResponse = RequestResponse.from_bool( - terminal_session.execute(command=["service", "FTPClient", "send", remote_ftp_options]) + exfil_response: RequestResponse = RequestResponse.from_bool( + terminal_session.execute(command=["service", "FTPClient", "send", exfil_opts]) ) # Validating that we successfully received the target data. - if remote_ftp_response.status == "failure": - self.sys_log.warning( - f"{self.name}: Remote connection: {remote_ip} failed to transfer the target data via FTP." - ) - return remote_ftp_response + if exfil_response.status == "failure": + self.sys_log.warning(f"{self.name}: Remote connection failure. failed to transfer the target data via FTP.") + return [False, exfil_response] - if exfiltration_folder.get_file(file_name) is None: - self.sys_log.warning(f"{self.name}: Unable to locate exfiltrated file on local filesystem.") + # Target file: + target_file: str = exfil_opts.get("src_file_name") + + # Creating the exfiltration folder . + exfiltration_folder = self.get_exfiltration_folder(exfil_opts.get("src_folder_name")) + + if exfiltration_folder.get_file(target_file) is None: + self.sys_log.warning( + f"{self.name}: Unable to locate exfiltrated file on local filesystem." + f"Perhaps the file transfer failed?" + ) return RequestResponse( status="failure", data={"reason": "Unable to locate exfiltrated data on file system."} ) @@ -438,16 +443,13 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): data={"Reason": "Cannot find any instances of both a FTP Server & Client. Are they installed?"}, ) - # Sending the transferred target data back to the C2 Server to successfully exfiltrate the data out the network. - return RequestResponse.from_bool( - self._host_ftp_client.send_file( - dest_ip_address=self.c2_remote_connection, - src_folder_name=dest_folder, # TODO: Clarify this - Dest folder has the same name on c2server/c2beacon. - src_file_name=file_name, - dest_folder_name=dest_folder, - dest_file_name=file_name, - ) - ) + return [ + True, + RequestResponse( + status="success", + data={"Reason": "Located the target file on local file system. Data exfiltration successful."}, + ), + ] def _command_terminal(self, payload: C2Packet) -> RequestResponse: """ @@ -461,8 +463,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ - terminal_output: Dict[int, RequestResponse] = {} - given_commands: list[RequestFormat] + command_opts = Terminal_Opts.model_validate(payload.payload) if self._host_terminal is None: return RequestResponse( @@ -470,17 +471,14 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): data={"Reason": "Host does not seem to have terminal installed. Unable to resolve command."}, ) - given_commands = payload.payload.get("commands") - given_username = payload.payload.get("username") - given_password = payload.payload.get("password") - remote_ip = payload.payload.get("ip_address") + terminal_output: Dict[int, RequestResponse] = {} # Creating a remote terminal session if given an IP Address, otherwise using a local terminal session. - if remote_ip is None: - terminal_session = self.get_terminal_session(username=given_username, password=given_password) + if command_opts.ip_address is None: + terminal_session = self.get_terminal_session(username=command_opts.username, password=command_opts.password) else: terminal_session = self.get_remote_terminal_session( - username=given_username, password=given_password, ip_address=remote_ip + username=command_opts.username, password=command_opts.password, ip_address=command_opts.ip_address ) if terminal_session is None: @@ -488,7 +486,12 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): status="failure", data={"reason": "Terminal Login failed. Cannot create a terminal session."} ) - for index, given_command in enumerate(given_commands): + # Converts a singular terminal command: [RequestFormat] into a list with one element [[RequestFormat]] + command_opts.commands = ( + [command_opts.commands] if not isinstance(command_opts.commands, list) else command_opts.commands + ) + + for index, given_command in enumerate(command_opts.commands): # A try catch exception ladder was used but was considered not the best approach # as it can end up obscuring visibility of actual bugs (Not the expected ones) and was a temporary solution. # TODO: Refactor + add further validation to ensure that a request is correct. (maybe a pydantic method?) diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index 3441a86b..f4cc7aa6 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -287,7 +287,7 @@ class C2Server(AbstractC2, identifier="C2Server"): if self._host_ftp_server is None: self.sys_log.warning(f"{self.name}: Unable to setup the FTP Server for data exfiltration") return False - if not self.get_exfiltration_folder(command_options.get("exfiltration_folder_name", "exfil")): + if self.get_exfiltration_folder(command_options.get("exfiltration_folder_name")) is None: self.sys_log.warning(f"{self.name}: Unable to create a folder for storing exfiltration data.") return False