diff --git a/src/primaite/game/agent/actions/config.py b/src/primaite/game/agent/actions/config.py index 6096a0b2..d627e4b0 100644 --- a/src/primaite/game/agent/actions/config.py +++ b/src/primaite/game/agent/actions/config.py @@ -174,4 +174,45 @@ class RansomwareLaunchC2ServerAction(AbstractAction): if config.node_name is None: return ["do_nothing"] # This action currently doesn't require any further configuration options. - return ["network", "node", config.node_name, "application", "C2Server", "ransomware_launch"] \ No newline at end of file + return ["network", "node", config.node_name, "application", "C2Server", "ransomware_launch"] + +class ExfiltrationC2ServerAction(AbstractAction): + """Action which exfiltrates a target file from a certain node onto the C2 beacon and then the C2 Server.""" + + class _Opts(BaseModel): + """Schema for options that can be passed to this action.""" + + username: Optional[str] + password: Optional[str] + target_ip_address: str + target_file_name: str + target_folder_name: str + exfiltration_folder_name: Optional[str] + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request( + self, + node_id: int, + account: dict, + target_ip_address: str, + target_file_name: str, + target_folder_name: str, + exfiltration_folder_name: Optional[str], + ) -> RequestFormat: + """Return the action formatted as a request that can be ingested by the simulation.""" + node_name = self.manager.get_node_name_by_idx(node_id) + if node_name is None: + return ["do_nothing"] + + command_model = { + "target_file_name": target_file_name, + "target_folder_name": target_folder_name, + "exfiltration_folder_name": exfiltration_folder_name, + "target_ip_address": target_ip_address, + "username": account["username"], + "password": account["password"], + } + ExfiltrationC2ServerAction._Opts.model_validate(command_model) + return ["network", "node", node_name, "application", "C2Server", "exfiltrate", command_model] \ No newline at end of file diff --git a/src/primaite/game/agent/actions/host_nic.py b/src/primaite/game/agent/actions/host_nic.py index a4dd8d9c..2e53cf72 100644 --- a/src/primaite/game/agent/actions/host_nic.py +++ b/src/primaite/game/agent/actions/host_nic.py @@ -1,9 +1,4 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK - -from typing import Dict, Optional - -from pydantic import BaseModel, ConfigDict - from primaite.game.agent.actions.manager import AbstractAction from primaite.interface.request import RequestFormat diff --git a/src/primaite/game/agent/actions/manager.py b/src/primaite/game/agent/actions/manager.py index 9621b7f0..d6b7d4b6 100644 --- a/src/primaite/game/agent/actions/manager.py +++ b/src/primaite/game/agent/actions/manager.py @@ -268,192 +268,6 @@ class ActionManager: """Return the gymnasium action space for this agent.""" return spaces.Discrete(len(self.action_map)) - # def get_node_name_by_idx(self, node_idx: int) -> str: - # """ - # Get the node name corresponding to the given index. - - # :param node_idx: The index of the node to retrieve. - # :type node_idx: int - # :return: The node hostname. - # :rtype: str - # """ - # if not node_idx < len(self.node_names): - # msg = ( - # f"Error: agent attempted to perform an action on node {node_idx}, but its action space only" - # f"has {len(self.node_names)} nodes." - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.node_names[node_idx] - - # def get_folder_name_by_idx(self, node_idx: int, folder_idx: int) -> Optional[str]: - # """ - # Get the folder name corresponding to the given node and folder indices. - - # :param node_idx: The index of the node. - # :type node_idx: int - # :param folder_idx: The index of the folder on the node. - # :type folder_idx: int - # :return: The name of the folder. Or None if the node has fewer folders than the given index. - # :rtype: Optional[str] - # """ - # if node_idx >= len(self.folder_names) or folder_idx >= len(self.folder_names[node_idx]): - # msg = ( - # f"Error: agent attempted to perform an action on node {node_idx} and folder {folder_idx}, but this" - # f" is out of range for its action space. Folder on each node: {self.folder_names}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.folder_names[node_idx][folder_idx] - - # def get_file_name_by_idx(self, node_idx: int, folder_idx: int, file_idx: int) -> Optional[str]: - # """Get the file name corresponding to the given node, folder, and file indices. - - # :param node_idx: The index of the node. - # :type node_idx: int - # :param folder_idx: The index of the folder on the node. - # :type folder_idx: int - # :param file_idx: The index of the file in the folder. - # :type file_idx: int - # :return: The name of the file. Or None if the node has fewer folders than the given index, or the folder has - # fewer files than the given index. - # :rtype: Optional[str] - # """ - # if ( - # node_idx >= len(self.file_names) - # or folder_idx >= len(self.file_names[node_idx]) - # or file_idx >= len(self.file_names[node_idx][folder_idx]) - # ): - # msg = ( - # f"Error: agent attempted to perform an action on node {node_idx} folder {folder_idx} file {file_idx}" - # f" but this is out of range for its action space. Files on each node: {self.file_names}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.file_names[node_idx][folder_idx][file_idx] - - # def get_service_name_by_idx(self, node_idx: int, service_idx: int) -> Optional[str]: - # """Get the service name corresponding to the given node and service indices. - - # :param node_idx: The index of the node. - # :type node_idx: int - # :param service_idx: The index of the service on the node. - # :type service_idx: int - # :return: The name of the service. Or None if the node has fewer services than the given index. - # :rtype: Optional[str] - # """ - # if node_idx >= len(self.service_names) or service_idx >= len(self.service_names[node_idx]): - # msg = ( - # f"Error: agent attempted to perform an action on node {node_idx} and service {service_idx}, but this" - # f" is out of range for its action space. Services on each node: {self.service_names}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.service_names[node_idx][service_idx] - - # def get_application_name_by_idx(self, node_idx: int, application_idx: int) -> Optional[str]: - # """Get the application name corresponding to the given node and service indices. - - # :param node_idx: The index of the node. - # :type node_idx: int - # :param application_idx: The index of the service on the node. - # :type application_idx: int - # :return: The name of the service. Or None if the node has fewer services than the given index. - # :rtype: Optional[str] - # """ - # if node_idx >= len(self.application_names) or application_idx >= len(self.application_names[node_idx]): - # msg = ( - # f"Error: agent attempted to perform an action on node {node_idx} and app {application_idx}, but " - # f"this is out of range for its action space. Applications on each node: {self.application_names}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.application_names[node_idx][application_idx] - - # def get_internet_protocol_by_idx(self, protocol_idx: int) -> str: - # """Get the internet protocol corresponding to the given index. - - # :param protocol_idx: The index of the protocol to retrieve. - # :type protocol_idx: int - # :return: The protocol. - # :rtype: str - # """ - # if protocol_idx >= len(self.protocols): - # msg = ( - # f"Error: agent attempted to perform an action on protocol {protocol_idx} but this" - # f" is out of range for its action space. Protocols: {self.protocols}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.protocols[protocol_idx] - - # def get_ip_address_by_idx(self, ip_idx: int) -> str: - # """ - # Get the IP address corresponding to the given index. - - # :param ip_idx: The index of the IP address to retrieve. - # :type ip_idx: int - # :return: The IP address. - # :rtype: str - # """ - # if ip_idx >= len(self.ip_address_list): - # msg = ( - # f"Error: agent attempted to perform an action on ip address {ip_idx} but this" - # f" is out of range for its action space. IP address list: {self.ip_address_list}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.ip_address_list[ip_idx] - - # def get_wildcard_by_idx(self, wildcard_idx: int) -> str: - # """ - # Get the IP wildcard corresponding to the given index. - - # :param ip_idx: The index of the IP wildcard to retrieve. - # :type ip_idx: int - # :return: The wildcard address. - # :rtype: str - # """ - # if wildcard_idx >= len(self.wildcard_list): - # msg = ( - # f"Error: agent attempted to perform an action on ip wildcard {wildcard_idx} but this" - # f" is out of range for its action space. Wildcard list: {self.wildcard_list}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.wildcard_list[wildcard_idx] - - # def get_port_by_idx(self, port_idx: int) -> str: - # """ - # Get the port corresponding to the given index. - - # :param port_idx: The index of the port to retrieve. - # :type port_idx: int - # :return: The port. - # :rtype: str - # """ - # if port_idx >= len(self.ports): - # msg = ( - # f"Error: agent attempted to perform an action on port {port_idx} but this" - # f" is out of range for its action space. Port list: {self.ip_address_list}" - # ) - # _LOGGER.error(msg) - # raise RuntimeError(msg) - # return self.ports[port_idx] - - # def get_nic_num_by_idx(self, node_idx: int, nic_idx: int) -> int: - # """ - # Get the NIC number corresponding to the given node and NIC indices. - - # :param node_idx: The index of the node. - # :type node_idx: int - # :param nic_idx: The index of the NIC on the node. - # :type nic_idx: int - # :return: The NIC number. - # :rtype: int - # """ - # return nic_idx + 1 - @classmethod def from_config(cls, game: "PrimaiteGame", cfg: Dict) -> "ActionManager": """ diff --git a/src/primaite/game/agent/actions/session.py b/src/primaite/game/agent/actions/session.py new file mode 100644 index 00000000..9fd20a0c --- /dev/null +++ b/src/primaite/game/agent/actions/session.py @@ -0,0 +1,64 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from abc import abstractmethod +from typing import ClassVar + +from primaite.game.agent.actions.manager import AbstractAction +from primaite.interface.request import RequestFormat + + +class NodeSessionAbstractAction(AbstractAction): + """Base class for NodeSession actions.""" + + class ConfigSchema(AbstractAction.ConfigSchema): + """Base configuration schema for NodeSessionAbstractActions.""" + + node_name: str + remote_ip: str + + @abstractmethod + @classmethod + def form_request(cls, config: ConfigSchema) -> RequestFormat: + """Abstract method. Should return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + if config.node_name is None or config.remote_ip is None: + return ["do_nothing"] + + +class NodeSessionsRemoteLoginAction(AbstractAction, identifier="node_session_remote_login"): + """Action which performs a remote session login.""" + + class ConfigSchema(NodeSessionAbstractAction.ConfigSchema): + """Configuration schema for NodeSessionsRemoteLoginAction.""" + username: str + password: str + + @classmethod + def form_request(cls, config: ConfigSchema) -> RequestFormat: + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + if config.node_name is None or config.remote_ip is None: + return ["do_nothing"] + return [ + "network", + "node", + config.node_name, + "service", + "Terminal", + "ssh_to_remote", + config.username, + config.password, + config.remote_ip, + ] + + +class NodeSessionsRemoteLogoutAction(AbstractAction, identifier="node_session_remote_logout"): + """Action which performs a remote session logout.""" + + class ConfigSchema(NodeSessionAbstractAction.ConfigSchema): + """Configuration schema for NodeSessionsRemoteLogoutAction.""" + pass + + @classmethod + def form_request(cls, config: ConfigSchema) -> RequestFormat: + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + if config.node_name is None or config.remote_ip is None: + return ["do_nothing"] + return ["network", "node", config.node_name, "service", "Terminal", "remote_logoff", config.remote_ip] \ No newline at end of file