2023-10-19 01:56:40 +01:00
|
|
|
"""Interface for agents."""
|
2023-11-21 13:41:38 +00:00
|
|
|
import random
|
2023-09-26 12:54:56 +01:00
|
|
|
from abc import ABC, abstractmethod
|
2023-11-17 11:51:19 +00:00
|
|
|
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, TypeAlias, Union
|
2023-10-06 10:36:29 +01:00
|
|
|
|
2023-10-02 17:21:43 +01:00
|
|
|
import numpy as np
|
2023-11-14 15:10:07 +00:00
|
|
|
from gymnasium.core import ActType, ObsType
|
2023-11-16 13:26:30 +00:00
|
|
|
from pydantic import BaseModel
|
2023-09-26 12:54:56 +01:00
|
|
|
|
2023-10-02 17:21:43 +01:00
|
|
|
from primaite.game.agent.actions import ActionManager
|
2023-11-14 15:10:07 +00:00
|
|
|
from primaite.game.agent.observations import ObservationManager
|
2023-09-26 12:54:56 +01:00
|
|
|
from primaite.game.agent.rewards import RewardFunction
|
2023-11-17 11:51:19 +00:00
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from primaite.simulator.system.services.red_services.data_manipulation_bot import DataManipulationBot
|
2023-09-26 12:54:56 +01:00
|
|
|
|
2023-10-06 10:36:29 +01:00
|
|
|
ObsType: TypeAlias = Union[Dict, np.ndarray]
|
|
|
|
|
|
2023-09-26 12:54:56 +01:00
|
|
|
|
2023-11-20 10:38:01 +00:00
|
|
|
class AgentStartSettings(BaseModel):
|
|
|
|
|
"""Configuration values for when an agent starts performing actions."""
|
|
|
|
|
|
|
|
|
|
start_step: int = 5
|
|
|
|
|
"The timestep at which an agent begins performing it's actions"
|
|
|
|
|
frequency: int = 5
|
|
|
|
|
"The number of timesteps to wait between performing actions"
|
|
|
|
|
variance: int = 0
|
|
|
|
|
"The amount the frequency can randomly change to"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentSettings(BaseModel):
|
|
|
|
|
"""Settings for configuring the operation of an agent."""
|
|
|
|
|
|
|
|
|
|
start_settings: Optional[AgentStartSettings] = None
|
|
|
|
|
"Configuration for when an agent begins performing it's actions"
|
|
|
|
|
|
2023-11-21 11:42:01 +00:00
|
|
|
@classmethod
|
|
|
|
|
def from_config(cls, config: Optional[Dict]) -> "AgentSettings":
|
|
|
|
|
"""Construct agent settings from a config dictionary.
|
|
|
|
|
|
|
|
|
|
:param config: A dict of options for the agent settings.
|
|
|
|
|
:type config: Dict
|
|
|
|
|
:return: The agent settings.
|
|
|
|
|
:rtype: AgentSettings
|
|
|
|
|
"""
|
|
|
|
|
if config is None:
|
|
|
|
|
return cls()
|
|
|
|
|
|
|
|
|
|
return cls(**config)
|
|
|
|
|
|
2023-11-20 10:38:01 +00:00
|
|
|
|
2023-09-26 12:54:56 +01:00
|
|
|
class AbstractAgent(ABC):
|
|
|
|
|
"""Base class for scripted and RL agents."""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
2023-10-08 17:02:54 +01:00
|
|
|
agent_name: Optional[str],
|
2023-10-02 17:21:43 +01:00
|
|
|
action_space: Optional[ActionManager],
|
2023-11-14 15:10:07 +00:00
|
|
|
observation_space: Optional[ObservationManager],
|
2023-09-26 12:54:56 +01:00
|
|
|
reward_function: Optional[RewardFunction],
|
2023-11-20 10:38:01 +00:00
|
|
|
agent_settings: Optional[AgentSettings],
|
2023-09-26 12:54:56 +01:00
|
|
|
) -> None:
|
2023-10-19 09:36:23 +01:00
|
|
|
"""
|
|
|
|
|
Initialize an agent.
|
|
|
|
|
|
|
|
|
|
:param agent_name: Unique string identifier for the agent, for reporting and multi-agent purposes.
|
|
|
|
|
:type agent_name: Optional[str]
|
|
|
|
|
:param action_space: Action space for the agent.
|
|
|
|
|
:type action_space: Optional[ActionManager]
|
|
|
|
|
:param observation_space: Observation space for the agent.
|
|
|
|
|
:type observation_space: Optional[ObservationSpace]
|
|
|
|
|
:param reward_function: Reward function for the agent.
|
|
|
|
|
:type reward_function: Optional[RewardFunction]
|
|
|
|
|
"""
|
2023-10-09 18:35:30 +01:00
|
|
|
self.agent_name: str = agent_name or "unnamed_agent"
|
2023-11-14 15:10:07 +00:00
|
|
|
self.action_manager: Optional[ActionManager] = action_space
|
|
|
|
|
self.observation_manager: Optional[ObservationManager] = observation_space
|
2023-09-26 12:54:56 +01:00
|
|
|
self.reward_function: Optional[RewardFunction] = reward_function
|
2023-11-20 10:38:01 +00:00
|
|
|
self.agent_settings = agent_settings or AgentSettings()
|
|
|
|
|
|
2023-11-14 15:10:07 +00:00
|
|
|
def update_observation(self, state: Dict) -> ObsType:
|
2023-10-02 17:21:43 +01:00
|
|
|
"""
|
2023-10-19 09:36:23 +01:00
|
|
|
Convert a state from the simulator into an observation for the agent using the observation space.
|
|
|
|
|
|
2023-10-02 17:21:43 +01:00
|
|
|
state : dict state directly from simulation.describe_state
|
|
|
|
|
output : dict state according to CAOS.
|
|
|
|
|
"""
|
2023-11-14 15:10:07 +00:00
|
|
|
return self.observation_manager.update(state)
|
2023-10-02 17:21:43 +01:00
|
|
|
|
2023-11-14 15:10:07 +00:00
|
|
|
def update_reward(self, state: Dict) -> float:
|
2023-10-19 09:36:23 +01:00
|
|
|
"""
|
|
|
|
|
Use the reward function to calculate a reward from the state.
|
|
|
|
|
|
|
|
|
|
:param state: State of the environment.
|
|
|
|
|
:type state: Dict
|
|
|
|
|
:return: Reward from the state.
|
|
|
|
|
:rtype: float
|
|
|
|
|
"""
|
2023-11-14 15:10:07 +00:00
|
|
|
return self.reward_function.update(state)
|
2023-10-02 17:21:43 +01:00
|
|
|
|
|
|
|
|
@abstractmethod
|
2023-11-14 15:10:07 +00:00
|
|
|
def get_action(self, obs: ObsType, reward: float = 0.0) -> Tuple[str, Dict]:
|
2023-10-19 09:36:23 +01:00
|
|
|
"""
|
|
|
|
|
Return an action to be taken in the environment.
|
|
|
|
|
|
|
|
|
|
Subclasses should implement agent logic here. It should use the observation as input to decide best next action.
|
|
|
|
|
|
|
|
|
|
:param obs: Observation of the environment.
|
|
|
|
|
:type obs: ObsType
|
|
|
|
|
:param reward: Reward from the previous action, defaults to None TODO: should this parameter even be accepted?
|
|
|
|
|
:type reward: float, optional
|
|
|
|
|
:return: Action to be taken in the environment.
|
|
|
|
|
:rtype: Tuple[str, Dict]
|
|
|
|
|
"""
|
2023-11-13 16:04:25 +00:00
|
|
|
# in RL agent, this method will send CAOS observation to RL agent, then receive a int 0-39,
|
2023-10-02 17:21:43 +01:00
|
|
|
# then use a bespoke conversion to take 1-40 int back into CAOS action
|
2023-10-09 18:35:30 +01:00
|
|
|
return ("DO_NOTHING", {})
|
2023-10-02 17:21:43 +01:00
|
|
|
|
2023-10-09 18:35:30 +01:00
|
|
|
def format_request(self, action: Tuple[str, Dict], options: Dict[str, int]) -> List[str]:
|
2023-10-02 17:21:43 +01:00
|
|
|
# this will take something like APPLICATION.EXECUTE and add things like target_ip_address in simulator.
|
|
|
|
|
# therefore the execution definition needs to be a mapping from CAOS into SIMULATOR
|
|
|
|
|
"""Format action into format expected by the simulator, and apply execution definition if applicable."""
|
2023-11-14 15:10:07 +00:00
|
|
|
request = self.action_manager.form_request(action_identifier=action, action_options=options)
|
2023-10-06 20:32:52 +01:00
|
|
|
return request
|
2023-10-02 17:21:43 +01:00
|
|
|
|
2023-09-26 12:54:56 +01:00
|
|
|
|
|
|
|
|
class AbstractScriptedAgent(AbstractAgent):
|
|
|
|
|
"""Base class for actors which generate their own behaviour."""
|
|
|
|
|
|
|
|
|
|
...
|
|
|
|
|
|
2023-10-06 10:36:29 +01:00
|
|
|
|
2023-10-02 17:21:43 +01:00
|
|
|
class RandomAgent(AbstractScriptedAgent):
|
|
|
|
|
"""Agent that ignores its observation and acts completely at random."""
|
|
|
|
|
|
2023-11-14 15:10:07 +00:00
|
|
|
def get_action(self, obs: ObsType, reward: float = 0.0) -> Tuple[str, Dict]:
|
2023-10-19 09:36:23 +01:00
|
|
|
"""Randomly sample an action from the action space.
|
|
|
|
|
|
|
|
|
|
:param obs: _description_
|
|
|
|
|
:type obs: ObsType
|
|
|
|
|
:param reward: _description_, defaults to None
|
|
|
|
|
:type reward: float, optional
|
|
|
|
|
:return: _description_
|
|
|
|
|
:rtype: Tuple[str, Dict]
|
|
|
|
|
"""
|
2023-11-14 15:10:07 +00:00
|
|
|
return self.action_manager.get_action(self.action_manager.space.sample())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProxyAgent(AbstractAgent):
|
|
|
|
|
"""Agent that sends observations to an RL model and receives actions from that model."""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
agent_name: Optional[str],
|
|
|
|
|
action_space: Optional[ActionManager],
|
|
|
|
|
observation_space: Optional[ObservationManager],
|
|
|
|
|
reward_function: Optional[RewardFunction],
|
|
|
|
|
) -> None:
|
|
|
|
|
super().__init__(
|
|
|
|
|
agent_name=agent_name,
|
|
|
|
|
action_space=action_space,
|
|
|
|
|
observation_space=observation_space,
|
|
|
|
|
reward_function=reward_function,
|
|
|
|
|
)
|
|
|
|
|
self.most_recent_action: ActType
|
|
|
|
|
|
|
|
|
|
def get_action(self, obs: ObsType, reward: float = 0.0) -> Tuple[str, Dict]:
|
|
|
|
|
"""
|
|
|
|
|
Return the agent's most recent action, formatted in CAOS format.
|
|
|
|
|
|
|
|
|
|
:param obs: Observation for the agent. Not used by ProxyAgents, but required by the interface.
|
|
|
|
|
:type obs: ObsType
|
|
|
|
|
:param reward: Reward value for the agent. Not used by ProxyAgents, defaults to None.
|
|
|
|
|
:type reward: float, optional
|
|
|
|
|
:return: Action to be taken in CAOS format.
|
|
|
|
|
:rtype: Tuple[str, Dict]
|
|
|
|
|
"""
|
|
|
|
|
return self.action_manager.get_action(self.most_recent_action)
|
|
|
|
|
|
|
|
|
|
def store_action(self, action: ActType):
|
|
|
|
|
"""
|
|
|
|
|
Store the most recent action taken by the agent.
|
|
|
|
|
|
|
|
|
|
The environment is responsible for calling this method when it receives an action from the agent policy.
|
|
|
|
|
"""
|
|
|
|
|
self.most_recent_action = action
|
2023-10-02 17:21:43 +01:00
|
|
|
|
2023-11-17 11:51:19 +00:00
|
|
|
|
2023-11-13 15:55:14 +00:00
|
|
|
class DataManipulationAgent(AbstractScriptedAgent):
|
2023-11-17 11:51:19 +00:00
|
|
|
"""Agent that uses a DataManipulationBot to perform an SQL injection attack."""
|
|
|
|
|
|
2023-11-21 11:42:01 +00:00
|
|
|
data_manipulation_bots: List["DataManipulationBot"] = []
|
2023-11-21 13:41:38 +00:00
|
|
|
next_execution_timestep: int = 0
|
2023-11-21 11:42:01 +00:00
|
|
|
|
2023-11-16 13:26:30 +00:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
2023-11-23 16:06:19 +00:00
|
|
|
self._set_next_execution_timestep(self.agent_settings.start_settings.start_step)
|
|
|
|
|
|
|
|
|
|
def _set_next_execution_timestep(self, timestep: int) -> None:
|
|
|
|
|
"""Set the next execution timestep with a configured random variance.
|
|
|
|
|
|
|
|
|
|
:param timestep: The timestep to add variance to.
|
|
|
|
|
"""
|
|
|
|
|
random_timestep_increment = random.randint(
|
|
|
|
|
-self.agent_settings.start_settings.variance, self.agent_settings.start_settings.variance
|
|
|
|
|
)
|
|
|
|
|
self.next_execution_timestep = timestep + random_timestep_increment
|
2023-11-21 13:41:38 +00:00
|
|
|
|
2023-11-16 13:26:30 +00:00
|
|
|
def get_action(self, obs: ObsType, reward: float = None) -> Tuple[str, Dict]:
|
2023-11-17 11:51:19 +00:00
|
|
|
"""Randomly sample an action from the action space.
|
|
|
|
|
|
|
|
|
|
:param obs: _description_
|
|
|
|
|
:type obs: ObsType
|
|
|
|
|
:param reward: _description_, defaults to None
|
|
|
|
|
:type reward: float, optional
|
|
|
|
|
:return: _description_
|
|
|
|
|
:rtype: Tuple[str, Dict]
|
|
|
|
|
"""
|
2023-11-23 16:06:19 +00:00
|
|
|
current_timestep = self.action_space.session.step_counter
|
2023-11-21 13:41:38 +00:00
|
|
|
|
2023-11-23 16:06:19 +00:00
|
|
|
if current_timestep < self.next_execution_timestep:
|
2023-11-21 13:41:38 +00:00
|
|
|
return "DONOTHING", {"dummy": 0}
|
|
|
|
|
|
2023-11-23 16:06:19 +00:00
|
|
|
self._set_next_execution_timestep(current_timestep + self.agent_settings.start_settings.frequency)
|
2023-11-21 11:42:01 +00:00
|
|
|
|
2023-11-23 16:06:19 +00:00
|
|
|
return "NODE_APPLICATION_EXECUTE", {"node_id": 0, "application_id": 0}
|
2023-09-26 12:54:56 +01:00
|
|
|
|
2023-11-17 11:51:19 +00:00
|
|
|
|
2023-09-26 12:54:56 +01:00
|
|
|
class AbstractGATEAgent(AbstractAgent):
|
|
|
|
|
"""Base class for actors controlled via external messages, such as RL policies."""
|
|
|
|
|
|
|
|
|
|
...
|