#2912 - Updates to remaining action refactoring

This commit is contained in:
Charlie Crane
2024-10-18 12:07:53 +01:00
parent a90aec2bcd
commit 1b1f3e4f71
8 changed files with 45 additions and 32 deletions

View File

@@ -25,3 +25,16 @@ __all__ = (
"NodeServiceStopAction",
"ActionManager",
)
# __all__ = (
# "acl",
# "application",
# "config",
# "file",
# "folder",
# "host_nic",
# "manager",
# "network",
# "node",
# "service",
# )

View File

@@ -171,7 +171,7 @@ class RouterACLAddRuleAction(AbstractAction, identifier="router_acl_add_rule"):
]
class RouterACLRemoveRuleAction(AbstractAction):
class RouterACLRemoveRuleAction(AbstractAction, identifier="router_acl_remove_rule"):
"""Action which removes a rule from a router's ACL."""
class ConfigSchema(AbstractAction.ConfigSchema):
@@ -186,7 +186,7 @@ class RouterACLRemoveRuleAction(AbstractAction):
return ["network", "node", config.target_router, "acl", "remove_rule", config.position]
class FirewallACLAddRuleAction(AbstractAction):
class FirewallACLAddRuleAction(AbstractAction, identifier="firewall_acl_add_rule"):
"""Action which adds a rule to a firewall port's ACL."""
def __init__(
@@ -310,8 +310,9 @@ class FirewallACLAddRuleAction(AbstractAction):
dst_port,
position,
]
class FirewallACLRemoveRuleAction(AbstractAction):
class FirewallACLRemoveRuleAction(AbstractAction, identifier="firewall_acl_remove_rule"):
"""Action which removes a rule from a firewall port's ACL."""
def __init__(self, manager: "ActionManager", max_acl_rules: int, **kwargs) -> None:
@@ -325,8 +326,9 @@ class FirewallACLRemoveRuleAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"position": max_acl_rules}
@classmethod
def form_request(
self, target_firewall_nodename: str, firewall_port_name: str, firewall_port_direction: str, position: int
cls, target_firewall_nodename: str, firewall_port_name: str, firewall_port_direction: str, position: int
) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return [
@@ -338,4 +340,4 @@ class FirewallACLRemoveRuleAction(AbstractAction):
"acl",
"remove_rule",
position,
]
]

View File

@@ -75,6 +75,7 @@ class NodeApplicationInstallAction(NodeApplicationAbstractAction):
# TODO: Either changes to application form_request bits, or add that here.
class NodeApplicationRemoveAction(NodeApplicationAbstractAction):
"""Action which removes/uninstalls an application"""
@@ -84,4 +85,3 @@ class NodeApplicationRemoveAction(NodeApplicationAbstractAction):
verb: str = "uninstall"
# TODO: Either changes to application form_request bits, or add that here.

View File

@@ -1,7 +1,9 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Dict, Optional
from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator, ValidationInfo
from primaite.game.agent.actions.manager import AbstractAction
from primaite.interface.request import RequestFormat
@@ -28,6 +30,7 @@ class ConfigureRansomwareScriptAction(AbstractAction):
ConfigureRansomwareScriptAction._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "RansomwareScript", "configure", config]
class ConfigureDoSBotAction(AbstractAction):
"""Action which sets config parameters for a DoS bot on a node."""
@@ -53,7 +56,7 @@ class ConfigureDoSBotAction(AbstractAction):
return ["do_nothing"]
self._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", node_name, "application", "DoSBot", "configure", config]
class ConfigureC2BeaconAction(AbstractAction):
"""Action which configures a C2 Beacon based on the parameters given."""
@@ -67,7 +70,6 @@ class ConfigureC2BeaconAction(AbstractAction):
masquerade_protocol: str = Field(default="TCP")
masquerade_port: str = Field(default="HTTP")
class _Opts(BaseModel):
"""Schema for options that can be passed to this action."""
@@ -104,4 +106,4 @@ class ConfigureC2BeaconAction(AbstractAction):
ConfigureC2BeaconAction._Opts.model_validate(config) # check that options adhere to schema
return ["network", "node", config.node_name, "application", "C2Beacon", "configure", config.__dict__]
return ["network", "node", config.node_name, "application", "C2Beacon", "configure", config.__dict__]

View File

@@ -23,13 +23,11 @@ class NodeFolderAbstractAction(AbstractAction):
verb: ClassVar[str]
@classmethod
def form_request(cls, node_id: int, folder_id: int) -> RequestFormat:
def form_request(cls, config: ConfigSchema) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = cls.manager.get_node_name_by_idx(node_id)
folder_name = cls.manager.get_folder_name_by_idx(node_idx=node_id, folder_idx=folder_id)
if node_name is None or folder_name is None:
if config.node_name is None or config.folder_name is None:
return ["do_nothing"]
return ["network", "node", node_name, "file_system", "folder", folder_name, cls.verb]
return ["network", "node", config.node_name, "file_system", "folder", config.folder_name, cls.verb]
class NodeFolderScanAction(NodeFolderAbstractAction, identifier="node_folder_scan"):

View File

@@ -1,10 +1,13 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Dict, Optional
from pydantic import BaseModel, ConfigDict
from primaite.game.agent.actions.manager import AbstractAction
from primaite.interface.request import RequestFormat
class HostNICAbstractAction(AbstractAction):
"""
Abstract base class for NIC actions.
@@ -15,25 +18,12 @@ class HostNICAbstractAction(AbstractAction):
class ConfigSchema(AbstractAction.ConfigSchema):
"""Base Configuration schema for HostNIC actions."""
num_nodes: str
max_nics_per_node: str
node_name: str
nic_num: str
def __init__(self, manager: "ActionManager", num_nodes: int, max_nics_per_node: int, **kwargs) -> None:
"""Init method for HostNICAbstractAction.
:param manager: Reference to the ActionManager which created this action.
:type manager: ActionManager
:param num_nodes: Number of nodes in the simulation.
:type num_nodes: int
:param max_nics_per_node: Maximum number of NICs per node.
:type max_nics_per_node: int
"""
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes, "nic_id": max_nics_per_node}
self.verb: str # define but don't initialise: defends against children classes not defining this
@classmethod
def form_request(cls, config: ConfigSchema) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
@@ -41,11 +31,13 @@ class HostNICAbstractAction(AbstractAction):
return ["do_nothing"]
return ["network", "node", config.node_name, "network_interface", config.nic_num, cls.verb]
class HostNICEnableAction(HostNICAbstractAction):
"""Action which enables a NIC."""
class ConfigSchema(HostNICAbstractAction.ConfigSchema):
"""Configuration schema for HostNICEnableAction."""
verb: str = "enable"
@@ -54,4 +46,5 @@ class HostNICDisableAction(HostNICAbstractAction):
class ConfigSchema(HostNICAbstractAction.ConfigSchema):
"""Configuration schema for HostNICDisableAction."""
verb: str = "disable"
verb: str = "disable"

View File

@@ -23,6 +23,8 @@ from pydantic import BaseModel, ConfigDict
from primaite.game.game import PrimaiteGame
from primaite.interface.request import RequestFormat
# TODO: Make sure that actions are backwards compatible where the old YAML format is used.
class AbstractAction(BaseModel):
"""Base class for actions."""

View File

@@ -1,10 +1,13 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Dict, Optional
from pydantic import BaseModel, ConfigDict
from primaite.game.agent.actions.manager import AbstractAction
from primaite.interface.request import RequestFormat
class NetworkPortEnableAction(AbstractAction):
"""Action which enables are port on a router or a firewall."""
@@ -40,4 +43,4 @@ class NetworkPortDisableAction(AbstractAction):
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if target_nodename is None or port_id is None:
return ["do_nothing"]
return ["network", "node", target_nodename, "network_interface", port_id, "disable"]
return ["network", "node", target_nodename, "network_interface", port_id, "disable"]