Merge remote-tracking branch 'origin/dev' into UC7-migration

This commit is contained in:
Archer Bowen
2025-02-28 11:00:44 +00:00
committed by Marek Wolan
4 changed files with 1835 additions and 34 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,465 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from enum import IntEnum
from typing import Dict, List, Literal, Optional, Tuple, Type
from gymnasium.core import ObsType
from pydantic import Field
from primaite.game.agent.actions.acl import RouterACLAddRuleAction
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
class InsiderKillChainOptions(KillChainOptions):
"""Model validation for TAP003's Kill Chain."""
class _PlanningOptions(KillChainStageOptions):
"""Valid options for the `PLANNING` InsiderKillChain stage."""
starting_network_knowledge: Dict # TODO: more specific schema here?
class _AccessOptions(KillChainStageOptions):
"""Valid options for the `ACCESS` InsiderKillChain stage."""
pass
class _ManipulationOptions(KillChainStageOptions):
"""Valid options for the `MANIPULATION` InsiderKillChain stage."""
account_changes: List[Dict] = [] # TODO: More specific schema here?
class _ExploitOptions(KillChainStageOptions):
"""Valid options for the `EXPLOIT` InsiderKillChain stage."""
malicious_acls: List[RouterACLAddRuleAction.ConfigSchema] = []
PLANNING: _PlanningOptions = Field(default_factory=lambda: InsiderKillChainOptions._PlanningOptions())
ACCESS: _AccessOptions = Field(default_factory=lambda: InsiderKillChainOptions._AccessOptions())
MANIPULATION: _ManipulationOptions = Field(default_factory=lambda: InsiderKillChainOptions._ManipulationOptions())
EXPLOIT: _ExploitOptions = Field(default_factory=lambda: InsiderKillChainOptions._ExploitOptions())
class InsiderKillChain(IntEnum):
"""
Enumeration representing different attack stages of the vulnerability and backdoor creation kill chain.
This kill chain is designed around the TAP003 - Malicious Insider Corporal Pearson.
Each stage represents a specific phase in the kill chain.
Please refer to the TAP003 notebook for the current version's implementation of this kill chain.
"""
RECONNAISSANCE = 1
"Represents TAP003 identifying sensitive systems, data and access control mechanisms"
PLANNING = 2
"Represents TAP003 devising a plan to exploit their elevated privileges."
ACCESS = 3
"Represents TAP003's using legitimate credentials to access the access control settings."
MANIPULATION = 4
"Represents TAP003 altering ACLs, User & Group Attributes & other control mechanisms to grant unauthorised access"
EXPLOIT = 5
"Represents TAP003 exploiting their insider knowledge and privilege to implement changes for sabotage."
EMBED = 6
"Represents TAP003's additional changes to ensure continued access"
CONCEAL = 7
"Represents TAP003's efforts in hiding their traces of malicious activities"
EXTRACT = 8
"Represents TAP003 removing sensitive data from the organisation, either for personal gain or to inflict harm."
ERASE = 9
"Represents TAP003 covering their tracks by removing any tools, reverting temporary changes and logging out"
# These Enums must be included in all kill chains.
# Due to limitations in Python and Enums, it is not possible to inherit these Enums from an base class.
NOT_STARTED = 100
"Indicates that the Kill Chain has not started."
SUCCEEDED = 200
"Indicates that the kill chain has succeeded."
FAILED = 300
"Indicates that the attack has failed."
def initial_stage(self) -> "InsiderKillChain":
"""Returns the first stage in the kill chain. Used by Abstract TAP for TAP Agent Setup."""
return self.RECONNAISSANCE
class TAP003(AbstractTAP, discriminator="tap-003"):
"""
TAP003 | Malicious Insider Corporal Pearson.
Currently implements one kill chain: Backdoor & Vulnerability Creation.
This Threat Actor Profile (TAP) aims to introduce subtle cyber attack.
For example, the Backdoor & Vulnerability creation kill chain
creates DENY firewall rules which do not trigger NMNE.
Please see the TAP003-Kill-Chain-E2E.ipynb for more information.
"""
class AgentSettingsSchema(AbstractTAP.AgentSettingsSchema):
"""Agent Settings Schema that enforces TAP003's `kill_chain` config to use the InsiderKillChainOptions."""
kill_chain: InsiderKillChainOptions # = Field(default_factory=lambda: MobileMalwareKillChainOptions())
class ConfigSchema(AbstractTAP.ConfigSchema):
"""Config Schema for the TAP001 agent."""
type: Literal["tap-003"] = "tap-003"
agent_settings: "TAP003.AgentSettingsSchema" = Field(default_factory=lambda: TAP003.AgentSettingsSchema())
config: ConfigSchema
selected_kill_chain: Type[InsiderKillChain] = InsiderKillChain
_current_acl: int = 0
network_knowledge: Dict = {} # TODO: more specific typing
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._change_password_target_host: str = ""
"""If we have just sent a change password request over SSH, this variable keeps track of the hostname."""
self._ssh_target_host: str = ""
"""If we have just send a SSH_LOGIN request, keeps track of the hostname to which we are attempting to SSH."""
self._next_account_change: Optional[Dict] = None
self._num_acls = len(self.config.agent_settings.kill_chain.EXPLOIT.malicious_acls)
self.network_knowledge: dict = {"credentials": {}, "current_session": {}}
"""Keep track of current network state based on responses after sending actions. Populated during PLANNING."""
self.setup_agent()
def _progress_kill_chain(self) -> None:
"""Private Method used to progress the kill chain to the next stage."""
if self.next_kill_chain_stage == self.selected_kill_chain.EXPLOIT: # Covering final stage edge case.
self.current_kill_chain_stage = self.selected_kill_chain(self.current_kill_chain_stage + 1)
self.next_kill_chain_stage = self.selected_kill_chain.SUCCEEDED
else:
# Otherwise, set the current stage to the previous next and increment the next kill chain stage.
self.current_kill_chain_stage = self.next_kill_chain_stage
if self.current_kill_chain_stage == self.selected_kill_chain.SUCCEEDED:
self.next_kill_chain_stage = self.selected_kill_chain.NOT_STARTED
else:
self.next_kill_chain_stage = self.selected_kill_chain(self.current_kill_chain_stage + 1)
self.current_stage_progress = KillChainStageProgress.PENDING
def setup_agent(self) -> None:
"""Responsible for agent setup upon episode reset.
Explicitly this method performs the following:
1. Loads the inherited attribute 'selected_kill_chain' with the InsiderKillChain
2. Selects the starting node from the given user tap settings
3. Selects the target node from the given user tap settings
4. Sets the next execution timestep to the given user tap settings - start step
5. Sets TAP's current host as the selected starting node.
"""
# TAP Boilerplate Setup
self._setup_agent_kill_chain(InsiderKillChain)
# TAP003 Specific Setup
self._select_start_node()
self._set_next_execution_timestep(self.config.agent_settings.start_step)
self.current_host = self.starting_node
def get_action(self, obs: ObsType, timestep: int) -> Tuple[str, Dict]:
"""Follows the TAP003 Backdoor Vulnerability Kill Chain.
Calls the next TAP003 Action Stage. Uses private methods to schedule kill chain stages.
See TAP003-Kill-Chain-E2E.ipynb for further information on the TAP003 agent.
:param obs: Current observation for this agent.
:type obs: ObsType
:param timestep: The current simulation timestep, used for scheduling actions
:type timestep: int
:return: Action formatted in CAOS format
:rtype: Tuple[str, Dict]
"""
self._handle_login_response()
self._handle_change_password_response()
if timestep < self.next_execution_timestep or self.actions_concluded:
return "do-nothing", {} # bypasses self.chosen_action
# self.current_timestep is currently the previous execution timestep
# So it can be used to index action history.
if not self._tap_return_handler(self.current_timestep):
# If the application is already installed, don't keep retrying - this is an acceptable fail
if self.current_kill_chain_stage == InsiderKillChain.PLANNING:
last_action = self.history[self.current_timestep].action
fail_reason = self.history[self.current_timestep].response.data["reason"]
if last_action == "node-application-install" and fail_reason == "already installed":
pass
else:
self.update_current_timestep(new_timestep=timestep)
self._set_next_execution_timestep(timestep + self.config.agent_settings.frequency)
self._tap_outcome_handler(InsiderKillChain)
return self.chosen_action
self.update_current_timestep(new_timestep=timestep)
self._set_next_execution_timestep(timestep + self.config.agent_settings.frequency)
self._tap_outcome_handler(InsiderKillChain) # Handles successes and failures
# The kill chain is called in reverse order
# The kill chain sequence must be called in reverse order to ensure proper execution.
self._exploit()
self._manipulation()
self._access()
self._planning()
self._reconnaissance()
self._tap_start(InsiderKillChain)
return self.chosen_action
def _handle_login_response(self) -> None:
"""If the last request was an SSH login attempt, update the current session in network knowledge."""
if not self.history:
return
last_hist_item = self.history[-1]
if not last_hist_item.action == "node-session-remote-login" or last_hist_item.response.status != "success":
return
self.network_knowledge["current_session"] = {
"hostname": self._ssh_target_host,
"ip_address": last_hist_item.response.data["ip_address"],
"username": last_hist_item.response.data["username"],
}
self.logger.debug(
f"Updating network knowledge. Logged in as {last_hist_item.response.data['username']} on "
f"{self._ssh_target_host}"
)
def _handle_change_password_response(self) -> None:
if not self.history:
return
last_hist_item = self.history[-1]
# when sending remote change password command, this must get populated
if not self._change_password_target_host:
return
if (
last_hist_item.action == "node-send-remote-command"
and last_hist_item.parameters["command"][2] == "change_password"
and last_hist_item.response.status == "success"
):
# changing password logs us out, so our current session needs to be cleared
self.network_knowledge["current_session"] = {}
# update internal knowledge with the new password
ip = last_hist_item.parameters["remote_ip"]
username = last_hist_item.parameters["command"][3]
password = last_hist_item.parameters["command"][5]
hostname = self._change_password_target_host
self.network_knowledge["credentials"][hostname] = {
"ip_address": ip,
"username": username,
"password": password,
}
self.logger.debug(f"Updating network knowledge. Changed {username}'s password to {password} on {hostname}.")
self._change_password_target_host = ""
# local password change
elif last_hist_item.action == "node-accounts-change-password" and last_hist_item.response.status == "success":
self.network_knowledge["current_session"] = {}
username = last_hist_item.request[6]
password = last_hist_item.request[8]
hostname = last_hist_item.request[2]
self.network_knowledge["credentials"][hostname] = {"username": username, "password": password}
self.logger.debug(f"Updating network knowledge. Changed {username}'s password to {password} on {hostname}.")
self._change_password_target_host = ""
def _reconnaissance(self) -> None:
"""Insider Kill Chain | Reconnaissance Stage.
First stage in the Insider kill chain.
Sets the self.chosen attribute to the "do-nothing" CAOS action
and then calls the self._progress_kill_chain() method.
"""
if self.current_kill_chain_stage == self.selected_kill_chain.RECONNAISSANCE:
self.chosen_action = "do-nothing", {}
self._progress_kill_chain()
def _planning(self) -> None:
"""Insider Kill Chain | Planning Stage.
Second stage in the Insider kill chain.
Performs a trial using the given user PLANNING stage probability.
If the trial is successful then the agent populates its knowledge base with information from the config.
Otherwise, the stage is not progressed. Additionally, the agent's kill chain is set
to failure if the repeat_kill_chain_stages parameter is set to FALSE.
"""
if not self.current_kill_chain_stage == self.selected_kill_chain.PLANNING:
return
if not self._agent_trial_handler(self.config.agent_settings.kill_chain.PLANNING.probability):
if self.config.agent_settings.repeat_kill_chain_stages == False:
self.current_kill_chain_stage = self.selected_kill_chain.FAILED
self.chosen_action = "do-nothing", {}
else:
self.network_knowledge[
"credentials"
] = self.config.agent_settings.kill_chain.PLANNING.starting_network_knowledge["credentials"]
self.current_host = self.starting_node
self.logger.info("Resolving starting knowledge.")
self._progress_kill_chain()
if self.current_stage_progress == KillChainStageProgress.PENDING:
self.logger.info(f"TAP003 reached the {self.current_kill_chain_stage.name}")
def _access(self) -> None:
"""Insider Kill Chain | Planning Stage.
Third stage in the Insider kill chain.
Performs a trial using the given user ACCESS stage probability.
This currently does nothing.
"""
if self.current_kill_chain_stage == self.selected_kill_chain.ACCESS:
if self._agent_trial_handler(self.config.agent_settings.kill_chain.ACCESS.probability):
self._progress_kill_chain()
self.chosen_action = "do-nothing", {}
else:
if self.config.agent_settings.repeat_kill_chain_stages == False:
self.current_kill_chain_stage = self.selected_kill_chain.FAILED
self.chosen_action = "do-nothing", {}
def _manipulation(self) -> None:
"""Insider Kill Chain | Manipulation Stage.
Fourth stage in the Insider kill chain.
Performs a trial using the given user MANIPULATION stage probability.
If the trial is successful, the agent will change passwords for accounts that will later be used to execute
malicious commands
Otherwise if the stage is not progressed. Additionally, the agent's kill chain is set
to failure if the repeat_kill_chain_stages parameter is set to FALSE.
"""
if self.current_kill_chain_stage == self.selected_kill_chain.MANIPULATION:
if self._agent_trial_handler(self.config.agent_settings.kill_chain.MANIPULATION.probability):
self.logger.info(f"TAP003 reached the {self.current_kill_chain_stage.name}")
self.current_host = self.starting_node
account_changes = self.config.agent_settings.kill_chain.MANIPULATION.account_changes
if len(account_changes) > 0:
if not self._next_account_change:
self._next_account_change = account_changes.pop(0)
if self._next_account_change["host"] == self.current_host:
# do a local password change
self.chosen_action = "node-accounts-change-password", {
"node_name": self.current_host,
"username": self._next_account_change["username"],
"current_password": self.network_knowledge["credentials"][self.current_host]["password"],
"new_password": self._next_account_change["new_password"],
}
self.logger.info("Changing local password.")
self._next_account_change = account_changes.pop(0)
self._change_password_target_host = self.current_host
else:
# make sure we are logged in via ssh to remote node
hostname = self._next_account_change["host"]
if self.network_knowledge.get("current_session", {}).get("hostname") != hostname:
self._ssh_target_host = hostname
self.chosen_action = "node-session-remote-login", {
"node_name": self.starting_node,
"username": self.network_knowledge["credentials"][hostname]["username"],
"password": self.network_knowledge["credentials"][hostname]["password"],
"remote_ip": self.network_knowledge["credentials"][hostname]["ip_address"],
}
self.logger.info(f"Logging into {hostname} in order to change password.")
# once we know we are logged in, send a command to change password
else:
self.chosen_action = "node-send-remote-command", {
"node_name": self.starting_node,
"remote_ip": self.network_knowledge["credentials"][hostname]["ip_address"],
"command": [
"service",
"user-manager",
"change_password",
self._next_account_change["username"],
self.network_knowledge["credentials"][hostname]["password"],
self._next_account_change["new_password"],
],
}
self.logger.info(f"Changing password on remote node {hostname}")
self._next_account_change = account_changes.pop(0)
self._change_password_target_host = hostname
if len(account_changes) == 0:
self._next_account_change = None
self.logger.info("Finished changing passwords.")
self._progress_kill_chain()
self.current_stage_progress = KillChainStageProgress.PENDING
else:
if self.config.agent_settings.repeat_kill_chain_stages == False:
self.current_kill_chain_stage = self.selected_kill_chain.FAILED
self.chosen_action = "do-nothing", {}
def _exploit(self) -> None:
"""Insider Kill Chain | Exploit Stage.
Fifth stage in the Insider kill chain.
Performs a trial using the given user EXPLOIT stage probability.
If the trial is successful then self.chosen_action attribute is set to the
"node-send-remote-command" CAOS action with the "ROUTER_ACL_ADDRULE" as it's chosen command.
The impact of the ROUTER_ACL_ADDRULE is dependant on user given parameters. At current
the default impact of this stage is to block green agent traffic. An example of TAP003's
manipulation stage in action can be found in the TAP003 notebook.
Otherwise if the stage is not progressed. Additionally, the agent's kill chain is set
to failure if the repeat_kill_chain_stages parameter is set to FALSE.
"""
if self.current_kill_chain_stage == self.selected_kill_chain.EXPLOIT:
if self.current_kill_chain_stage == KillChainStageProgress.PENDING:
# Perform the probability of success once upon entering the stage.
if not self._agent_trial_handler(self.config.agent_settings.kill_chain.EXPLOIT.probability):
if self.config.agent_settings.repeat_kill_chain_stages == False:
self.current_kill_chain_stage = self.selected_kill_chain.FAILED
self.chosen_action = "do-nothing", {}
return
self.current_kill_chain_stage = KillChainStageProgress.IN_PROGRESS
self.config.agent_settings.kill_chain.EXPLOIT.malicious_acls = (
self.config.agent_settings.kill_chain.EXPLOIT.malicious_acls
)
self._num_acls = len(self.config.agent_settings.kill_chain.EXPLOIT.malicious_acls)
malicious_acl = self.config.agent_settings.kill_chain.EXPLOIT.malicious_acls[self._current_acl]
hostname = malicious_acl.target_router
if self.network_knowledge.get("current_session", {}).get("hostname") != hostname:
self._ssh_target_host = hostname
self.chosen_action = "node-session-remote-login", {
"node_name": self.starting_node,
"username": self.network_knowledge["credentials"][hostname]["username"],
"password": self.network_knowledge["credentials"][hostname]["password"],
"remote_ip": self.network_knowledge["credentials"][hostname]["ip_address"],
}
self.logger.info(f"Logging into {hostname} in order to add ACL rules.")
# once we know we are logged in, send a command to change password
else:
self.chosen_action = "node-send-remote-command", {
"node_name": self.starting_node,
"remote_ip": self.network_knowledge["credentials"][hostname]["ip_address"],
"command": [
"acl",
"add_rule",
malicious_acl.permission,
malicious_acl.protocol_name,
str(malicious_acl.src_ip),
str(malicious_acl.src_wildcard),
malicious_acl.src_port,
str(malicious_acl.dst_ip),
str(malicious_acl.dst_wildcard),
malicious_acl.dst_port,
malicious_acl.position,
],
}
self.logger.info(f"Adding ACL rule to {hostname}")
self._current_acl = self._current_acl + 1
if self._current_acl == self._num_acls:
self._current_acl = 0
self.logger.info("Finished adding ACL rules.")
self._progress_kill_chain()

View File

@@ -1,6 +1,21 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from primaite.game.agent import interface
from primaite.game.agent.scripted_agents import abstract_tap, data_manipulation_bot, probabilistic_agent, random_agent
from primaite.game.agent.scripted_agents import (
abstract_tap,
data_manipulation_bot,
probabilistic_agent,
random_agent,
TAP001,
TAP003,
)
__all__ = ("abstract_tap", "data_manipulation_bot", "interface", "probabilistic_agent", "random_agent")
__all__ = (
"abstract_tap",
"data_manipulation_bot",
"interface",
"probabilistic_agent",
"random_agent",
"TAP001",
"TAP003",
)

View File

@@ -1,49 +1,202 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from __future__ import annotations
import random
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple
from abc import abstractmethod
from enum import Enum, IntEnum
from typing import Dict, List, Optional, Tuple, Type
from gymnasium.core import ObsType
from pydantic import Field
from pydantic import BaseModel, ConfigDict, Field
from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent
__all__ = "AbstractTAPAgent"
from primaite.game.agent.interface import AbstractScriptedAgent
from primaite.game.science import simulate_trial
class AbstractTAPAgent(PeriodicAgent, ABC):
"""Base class for TAP agents to inherit from."""
# This class is required for abstract tap. The IntEnums in this class are repeated in other kill chains.
class BaseKillChain(IntEnum):
"""A generic kill chain for abstract tap initialisation.
config: "AbstractTAPAgent.ConfigSchema" = Field(default_factory=lambda: AbstractTAPAgent.ConfigSchema())
next_execution_timestep: int = 0
The IntEnums in this class are repeated in other kill chains
As IntEnums cannot be directly extended by a inheritance.
"""
class AgentSettingsSchema(PeriodicAgent.AgentSettingsSchema, ABC):
"""Schema for the `agent_settings` part of the agent config."""
NOT_STARTED = 100
"Indicates that the Kill Chain has not started."
SUCCEEDED = 200
"Indicates that the kill chain has succeeded."
FAILED = 300
"Indicates that the attack has failed."
possible_starting_nodes: List[str] = Field(default_factory=list)
# The original approach is to extend the base class during runtime via class methods.
# However, this approach drastically impacted the readability and complexity of the code
# So the decision was made to ignore the DRY Principle for kill chains.
class ConfigSchema(PeriodicAgent.ConfigSchema, ABC):
"""Configuration schema for Abstract TAP agents."""
@abstractmethod
def initial_stage(self) -> "BaseKillChain":
"""Returns the first stage in the kill chain. Used for Abstract TAP Setup."""
return self.NOT_STARTED
type: str = "abstract-tap"
agent_settings: AbstractTAPAgent.AgentSettingsSchema = Field(
default_factory=lambda: AbstractTAPAgent.AgentSettingsSchema()
class KillChainStageProgress(Enum):
"""Generic Progress Enums. Used by TAP Agents to keep track of kill chain stages that required multiple actions."""
PENDING = 0
"""Indicates that the current kill chain stage is yet to start."""
IN_PROGRESS = 1
"""Indicates that the current kill chain stage is not yet completed."""
FINISHED = 2
"""Indicates that the current kill chain stage stage has been completed."""
class KillChainOptions(BaseModel):
"""Base Class for Kill Chain Options. Inherited by all TAP Type Agents."""
model_config = ConfigDict(extra="forbid")
class KillChainStageOptions(BaseModel):
"""Shared options for generic Kill Chain Stages."""
model_config = ConfigDict(extra="forbid")
probability: float = 1
class AbstractTAP(AbstractScriptedAgent):
"""Abstract class for Threat Actor Persona (TAP) Type Agents must inherit from.
This abstract base class provides TAP agents an interface which provides
TAP type agents the necessary methods to execute kill chain(s) with
configurable parameters.
TAP Actions are returned to the Request Manager as a Tuple
in CAOS format via the get_action method in line with other agents.
Abstract TAP Class intends to provide each TAP the following:
1. Kill Chain Progression
Kill Chains are IntEnums which define the different stages within a kill chain.
These stages are intended to be used across multiple ARCD environments.
2. Abstract Methods For Kill Chain Control Flow
Abstract methods _progress_kill_chain & _setup_kill_chain
are intended to provide TAP type agent additional control
over execution flow in comparison to AbstractScriptedAgent.
Usually these methods handle kill chain progression & success criteria.
For more information about Abstract TAPs please refer
to the methods & attributes documentation directly.
Additionally, Refer to a specific TAP for a more specific example.
"""
class AgentSettingsSchema(AbstractScriptedAgent.AgentSettingsSchema):
"""Agent Settings Schema. Default settings applied for all threat actor profiles."""
start_step: int = 5
frequency: int = 5
variance: int = 0
repeat_kill_chain: bool = False
repeat_kill_chain_stages: bool = True
starting_nodes: Optional[List[str]] = []
default_starting_node: str
kill_chain: KillChainOptions
class ConfigSchema(AbstractScriptedAgent.ConfigSchema):
"""Configuration schema applicable to all TAP agents."""
agent_settings: "AbstractTAP.AgentSettingsSchema" = Field(
default_factory=lambda: AbstractTAP.AgentSettingsSchema()
)
starting_node: Optional[str] = None
config: ConfigSchema = Field(default_factory=lambda: AbstractTAP.ConfigSchema())
selected_kill_chain: Type[BaseKillChain]
"""A combination of TAP's base & default kill chain. Loaded dynamically during agent setup."""
next_execution_timestep: int = 0
"""The next timestep in which the agent will attempt to progress the kill chain."""
starting_node: str = ""
"""The name (string) of TAP agent's starting node. This attribute is initialised via _self_select_starting_node."""
actions_concluded: bool = False
"""Boolean value which indicates if a TAP Agent has completed it's attack for the episode."""
next_kill_chain_stage: BaseKillChain = BaseKillChain.NOT_STARTED
"""The IntEnum of the next kill chain stage to be executed.
This attribute is initialised via _tap_start.
Afterwards, this attribute is loaded dynamically via _progress_kill_chain.
"""
current_kill_chain_stage: BaseKillChain = BaseKillChain.NOT_STARTED
"""The TAP agent's current kill chain.
This attribute is used as a state to indicate the current progress in a kill chain.
"""
current_stage_progress: KillChainStageProgress = KillChainStageProgress.PENDING
"""The TAP agent's current progress in a stage within a kill chain.
This attribute is used as a state to indicate the current progress in a individual kill chain stage.
Some TAP's require multiple actions to take place before moving onto the next stage in a kill chain.
This attribute is used to keep track of the current progress within an individual stage.
"""
chosen_action: Tuple[str, Dict] = "do-nothing", {}
"""The next agent's chosen action. Returned in CAOS format at the end of each timestep."""
current_host: str = ""
"""The name (str) of a TAP agent's currently selected host.
This attribute is set dynamically during tap execution via _set_current_host.
"""
current_timestep: int = 0
"""The current timestep (int) of the game.
This attribute is set to the "timestep" argument passed to get_action.
Mainly used to by kill chain stages for complex execution flow that is dependant on the simulation.
Specifically, this attribute is used for indexing previous actions in the self.history inherited attribute.
For more information please refer to AbstractAgent's "self.history" attribute
And for action responses see 'request.py' and the .data attribute.
Lastly, a demonstration of the above capability can be found in the PROPAGATE step in the tap001-e2e notebook.
"""
def update_current_timestep(self, new_timestep: int):
"""Updates the current time_step attribute to the given timestep argument."""
self.current_timestep = new_timestep
@abstractmethod
def get_action(self, obs: ObsType, timestep: int = 0) -> Tuple[str, Dict]:
"""Return an action to be taken in the environment."""
return super().get_action(obs=obs, timestep=timestep)
def _progress_kill_chain(self):
"""Private Abstract method which defines the default kill chain progression.
@abstractmethod
def setup_agent(self) -> None:
"""Set up agent."""
This abstract method intend to allow TAPs to control the logic flow of their kill chain.
In a majority of cases this method handles the success criteria and incrementing the current kill chain intenum.
This method is abstract so TAPs can configure this behaviour for tap specific implementations.
"""
pass
def _select_start_node(self) -> None:
"""
Handles setting the starting node behaviour of TAP type agents.
If the user given tap_settings provides a starting_node list then the starting node
is set to a random node given in the starting_node list.
Otherwise, the starting node is set to the 'default_starting_node' option.
"""
# Catches empty starting nodes.
if not self.config.agent_settings.starting_nodes:
self.starting_node = self.config.agent_settings.default_starting_node
else:
self.starting_node = random.choice(self.config.agent_settings.starting_nodes)
def _setup_agent_kill_chain(self, given_kill_chain: BaseKillChain) -> None:
"""Sets the 'next_kill_chain_stage' TAP attribute via the public kill chain method 'initial_stage'."""
self.selected_kill_chain = given_kill_chain
self.next_kill_chain_stage = self.selected_kill_chain.initial_stage(given_kill_chain)
def _set_next_execution_timestep(self, timestep: int) -> None:
"""Set the next execution timestep with a configured random variance.
@@ -54,8 +207,96 @@ class AbstractTAPAgent(PeriodicAgent, ABC):
)
self.next_execution_timestep = timestep + random_timestep_increment
def _select_start_node(self) -> None:
"""Set the starting starting node of the agent to be a random node from this agent's action manager."""
# we are assuming that every node in the node manager has a data manipulation application at idx 0
self.starting_node = random.choice(self.config.agent_settings.possible_starting_nodes)
self.logger.debug(f"Selected starting node: {self.starting_node}")
def _agent_trial_handler(self, agent_probability_of_success: int) -> bool:
"""Acts as a wrapper around simulate trial - Sets kill chain stage to failed if the relevant setting is set.
:param agent_probability_of_success: The probability of the action success to be passed to simulate_trial.
:type agent_probability_of_success: int.
:rtype: Bool.
"""
if simulate_trial(agent_probability_of_success):
return True
else:
self.logger.info(
f"failed to reach kill chain stage {self.next_kill_chain_stage.name} due to probability of failure."
)
if self.config.agent_settings.repeat_kill_chain_stages == False:
self.logger.info(f"Thus {self.config.ref} has failed the kill chain")
self.current_kill_chain_stage = self.selected_kill_chain.FAILED
return False
else:
self.logger.info(f"Retrying from stage {self.current_kill_chain_stage.name}.")
return False
def _tap_outcome_handler(self, selected_kill_chain_class: BaseKillChain) -> None:
"""
Default TAP behaviour for base kill chain stages.
Upon Success and failure:
TAPs will either repeat or re-attack dependant on the user given settings.
:param tap_kill_chain: The TAP agent's currently selected kill chain.
:type tap_kill_chain: BaseKillChain
"""
if (
self.current_kill_chain_stage == self.selected_kill_chain.SUCCEEDED
or self.current_kill_chain_stage == self.selected_kill_chain.FAILED
):
if self.actions_concluded == True: # Prevents Further logging via a guard clause boolean
self.chosen_action = "do-nothing", {}
return
if self.current_kill_chain_stage == self.selected_kill_chain.SUCCEEDED:
self.logger.info(f"{self.config.ref} has successfully carried out the kill chain.")
if self.current_kill_chain_stage == self.selected_kill_chain.FAILED:
self.logger.info(f"{self.config.ref} has failed the Kill Chain.")
if self.config.agent_settings.repeat_kill_chain == True:
self.logger.info(f"{self.config.ref} has opted to re-attack!")
self.current_kill_chain_stage = BaseKillChain.NOT_STARTED
self.next_kill_chain_stage = selected_kill_chain_class.initial_stage(selected_kill_chain_class)
else:
self.logger.info(f"{self.config.ref} has opted to forgo any further attacks.")
self.actions_concluded = True # Guard Clause Bool
self.chosen_action = "do-nothing", {}
def _tap_return_handler(self, timestep: int) -> bool:
# Intelligence | Use the request_response system to enable different behaviour
"""
Handles the request_manager's response query. Sets Kill Chain to false if failed.
If the previous action failed due to the simulation state,
the kill chain is considered to have failed.
Returns True if the previous action was successful.
Returns False if the previous action was any other state.
(Including Pending and Failure)
:param timestep: The current primAITE game layer timestep.
:type timestep: int
:rtype bool
"""
if self.history[timestep].response.status != "success":
self.logger.info(
f"{self.config.ref} has failed to successfully carry out {self.current_kill_chain_stage.name}"
)
self.logger.info(f"due to the simulation state: {self.history[timestep].response.data}")
if self.config.agent_settings.repeat_kill_chain_stages == False:
self.logger.info(
f"Thus {self.config.ref} has failed this kill chain attempt on {self.current_kill_chain_stage.name}"
)
self.current_kill_chain_stage = self.selected_kill_chain.FAILED
else:
self.logger.info(f"Retrying from stage {self.current_kill_chain_stage.name}!")
return False
return True
def _tap_start(self, tap_kill_chain: Type[BaseKillChain]) -> None:
"""
Sets the TAP Agent's beginning current/next kill chain stages.
:param IntEnum tap_kill_chain: A currently selected kill chain.
"""
if self.current_kill_chain_stage == self.selected_kill_chain.NOT_STARTED:
self.current_kill_chain_stage = tap_kill_chain.initial_stage(tap_kill_chain)
self.next_kill_chain_stage = self.selected_kill_chain(self.current_kill_chain_stage + 1)
self.logger.info(f"{self.config.ref} has begun it's attack!")
self.chosen_action = "do-nothing", {}