#2912 - eod commit. Gutted ActionManager and corrected some identifiers.

This commit is contained in:
Charlie Crane
2024-10-18 16:28:15 +01:00
parent 83d3120b04
commit a5c7565f0e
4 changed files with 106 additions and 192 deletions

View File

@@ -175,3 +175,44 @@ class RansomwareLaunchC2ServerAction(AbstractAction):
return ["do_nothing"]
# This action currently doesn't require any further configuration options.
return ["network", "node", config.node_name, "application", "C2Server", "ransomware_launch"]
class ExfiltrationC2ServerAction(AbstractAction):
"""Action which exfiltrates a target file from a certain node onto the C2 beacon and then the C2 Server."""
class _Opts(BaseModel):
"""Schema for options that can be passed to this action."""
username: Optional[str]
password: Optional[str]
target_ip_address: str
target_file_name: str
target_folder_name: str
exfiltration_folder_name: Optional[str]
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(
self,
node_id: int,
account: dict,
target_ip_address: str,
target_file_name: str,
target_folder_name: str,
exfiltration_folder_name: Optional[str],
) -> RequestFormat:
"""Return the action formatted as a request that can be ingested by the simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
command_model = {
"target_file_name": target_file_name,
"target_folder_name": target_folder_name,
"exfiltration_folder_name": exfiltration_folder_name,
"target_ip_address": target_ip_address,
"username": account["username"],
"password": account["password"],
}
ExfiltrationC2ServerAction._Opts.model_validate(command_model)
return ["network", "node", node_name, "application", "C2Server", "exfiltrate", command_model]

View File

@@ -1,9 +1,4 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Dict, Optional
from pydantic import BaseModel, ConfigDict
from primaite.game.agent.actions.manager import AbstractAction
from primaite.interface.request import RequestFormat

View File

@@ -268,192 +268,6 @@ class ActionManager:
"""Return the gymnasium action space for this agent."""
return spaces.Discrete(len(self.action_map))
# def get_node_name_by_idx(self, node_idx: int) -> str:
# """
# Get the node name corresponding to the given index.
# :param node_idx: The index of the node to retrieve.
# :type node_idx: int
# :return: The node hostname.
# :rtype: str
# """
# if not node_idx < len(self.node_names):
# msg = (
# f"Error: agent attempted to perform an action on node {node_idx}, but its action space only"
# f"has {len(self.node_names)} nodes."
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.node_names[node_idx]
# def get_folder_name_by_idx(self, node_idx: int, folder_idx: int) -> Optional[str]:
# """
# Get the folder name corresponding to the given node and folder indices.
# :param node_idx: The index of the node.
# :type node_idx: int
# :param folder_idx: The index of the folder on the node.
# :type folder_idx: int
# :return: The name of the folder. Or None if the node has fewer folders than the given index.
# :rtype: Optional[str]
# """
# if node_idx >= len(self.folder_names) or folder_idx >= len(self.folder_names[node_idx]):
# msg = (
# f"Error: agent attempted to perform an action on node {node_idx} and folder {folder_idx}, but this"
# f" is out of range for its action space. Folder on each node: {self.folder_names}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.folder_names[node_idx][folder_idx]
# def get_file_name_by_idx(self, node_idx: int, folder_idx: int, file_idx: int) -> Optional[str]:
# """Get the file name corresponding to the given node, folder, and file indices.
# :param node_idx: The index of the node.
# :type node_idx: int
# :param folder_idx: The index of the folder on the node.
# :type folder_idx: int
# :param file_idx: The index of the file in the folder.
# :type file_idx: int
# :return: The name of the file. Or None if the node has fewer folders than the given index, or the folder has
# fewer files than the given index.
# :rtype: Optional[str]
# """
# if (
# node_idx >= len(self.file_names)
# or folder_idx >= len(self.file_names[node_idx])
# or file_idx >= len(self.file_names[node_idx][folder_idx])
# ):
# msg = (
# f"Error: agent attempted to perform an action on node {node_idx} folder {folder_idx} file {file_idx}"
# f" but this is out of range for its action space. Files on each node: {self.file_names}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.file_names[node_idx][folder_idx][file_idx]
# def get_service_name_by_idx(self, node_idx: int, service_idx: int) -> Optional[str]:
# """Get the service name corresponding to the given node and service indices.
# :param node_idx: The index of the node.
# :type node_idx: int
# :param service_idx: The index of the service on the node.
# :type service_idx: int
# :return: The name of the service. Or None if the node has fewer services than the given index.
# :rtype: Optional[str]
# """
# if node_idx >= len(self.service_names) or service_idx >= len(self.service_names[node_idx]):
# msg = (
# f"Error: agent attempted to perform an action on node {node_idx} and service {service_idx}, but this"
# f" is out of range for its action space. Services on each node: {self.service_names}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.service_names[node_idx][service_idx]
# def get_application_name_by_idx(self, node_idx: int, application_idx: int) -> Optional[str]:
# """Get the application name corresponding to the given node and service indices.
# :param node_idx: The index of the node.
# :type node_idx: int
# :param application_idx: The index of the service on the node.
# :type application_idx: int
# :return: The name of the service. Or None if the node has fewer services than the given index.
# :rtype: Optional[str]
# """
# if node_idx >= len(self.application_names) or application_idx >= len(self.application_names[node_idx]):
# msg = (
# f"Error: agent attempted to perform an action on node {node_idx} and app {application_idx}, but "
# f"this is out of range for its action space. Applications on each node: {self.application_names}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.application_names[node_idx][application_idx]
# def get_internet_protocol_by_idx(self, protocol_idx: int) -> str:
# """Get the internet protocol corresponding to the given index.
# :param protocol_idx: The index of the protocol to retrieve.
# :type protocol_idx: int
# :return: The protocol.
# :rtype: str
# """
# if protocol_idx >= len(self.protocols):
# msg = (
# f"Error: agent attempted to perform an action on protocol {protocol_idx} but this"
# f" is out of range for its action space. Protocols: {self.protocols}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.protocols[protocol_idx]
# def get_ip_address_by_idx(self, ip_idx: int) -> str:
# """
# Get the IP address corresponding to the given index.
# :param ip_idx: The index of the IP address to retrieve.
# :type ip_idx: int
# :return: The IP address.
# :rtype: str
# """
# if ip_idx >= len(self.ip_address_list):
# msg = (
# f"Error: agent attempted to perform an action on ip address {ip_idx} but this"
# f" is out of range for its action space. IP address list: {self.ip_address_list}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.ip_address_list[ip_idx]
# def get_wildcard_by_idx(self, wildcard_idx: int) -> str:
# """
# Get the IP wildcard corresponding to the given index.
# :param ip_idx: The index of the IP wildcard to retrieve.
# :type ip_idx: int
# :return: The wildcard address.
# :rtype: str
# """
# if wildcard_idx >= len(self.wildcard_list):
# msg = (
# f"Error: agent attempted to perform an action on ip wildcard {wildcard_idx} but this"
# f" is out of range for its action space. Wildcard list: {self.wildcard_list}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.wildcard_list[wildcard_idx]
# def get_port_by_idx(self, port_idx: int) -> str:
# """
# Get the port corresponding to the given index.
# :param port_idx: The index of the port to retrieve.
# :type port_idx: int
# :return: The port.
# :rtype: str
# """
# if port_idx >= len(self.ports):
# msg = (
# f"Error: agent attempted to perform an action on port {port_idx} but this"
# f" is out of range for its action space. Port list: {self.ip_address_list}"
# )
# _LOGGER.error(msg)
# raise RuntimeError(msg)
# return self.ports[port_idx]
# def get_nic_num_by_idx(self, node_idx: int, nic_idx: int) -> int:
# """
# Get the NIC number corresponding to the given node and NIC indices.
# :param node_idx: The index of the node.
# :type node_idx: int
# :param nic_idx: The index of the NIC on the node.
# :type nic_idx: int
# :return: The NIC number.
# :rtype: int
# """
# return nic_idx + 1
@classmethod
def from_config(cls, game: "PrimaiteGame", cfg: Dict) -> "ActionManager":
"""

View File

@@ -0,0 +1,64 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from abc import abstractmethod
from typing import ClassVar
from primaite.game.agent.actions.manager import AbstractAction
from primaite.interface.request import RequestFormat
class NodeSessionAbstractAction(AbstractAction):
"""Base class for NodeSession actions."""
class ConfigSchema(AbstractAction.ConfigSchema):
"""Base configuration schema for NodeSessionAbstractActions."""
node_name: str
remote_ip: str
@abstractmethod
@classmethod
def form_request(cls, config: ConfigSchema) -> RequestFormat:
"""Abstract method. Should return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if config.node_name is None or config.remote_ip is None:
return ["do_nothing"]
class NodeSessionsRemoteLoginAction(AbstractAction, identifier="node_session_remote_login"):
"""Action which performs a remote session login."""
class ConfigSchema(NodeSessionAbstractAction.ConfigSchema):
"""Configuration schema for NodeSessionsRemoteLoginAction."""
username: str
password: str
@classmethod
def form_request(cls, config: ConfigSchema) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if config.node_name is None or config.remote_ip is None:
return ["do_nothing"]
return [
"network",
"node",
config.node_name,
"service",
"Terminal",
"ssh_to_remote",
config.username,
config.password,
config.remote_ip,
]
class NodeSessionsRemoteLogoutAction(AbstractAction, identifier="node_session_remote_logout"):
"""Action which performs a remote session logout."""
class ConfigSchema(NodeSessionAbstractAction.ConfigSchema):
"""Configuration schema for NodeSessionsRemoteLogoutAction."""
pass
@classmethod
def form_request(cls, config: ConfigSchema) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if config.node_name is None or config.remote_ip is None:
return ["do_nothing"]
return ["network", "node", config.node_name, "service", "Terminal", "remote_logoff", config.remote_ip]