Merge branch 'dev' into feature/2735_Implement-UserManager-class-and-integrate-into-Node

This commit is contained in:
Chris McCarthy
2024-07-12 17:14:57 +01:00
78 changed files with 4046 additions and 644 deletions

View File

@@ -741,6 +741,7 @@ agents:
agent_settings:
flatten_obs: true
action_masking: true

View File

@@ -733,6 +733,7 @@ agents:
agent_settings:
flatten_obs: true
action_masking: true
- ref: defender_2
team: BLUE
@@ -1316,6 +1317,7 @@ agents:
agent_settings:
flatten_obs: true
action_masking: true

View File

@@ -44,3 +44,18 @@ def data_manipulation_config_path() -> Path:
_LOGGER.error(msg)
raise FileNotFoundError(msg)
return path
def data_manipulation_marl_config_path() -> Path:
"""
Get the path to the MARL example config.
:return: Path to yaml config file for the MARL scenario.
:rtype: Path
"""
path = _EXAMPLE_CFG / "data_manipulation_marl.yaml"
if not path.exists():
msg = f"Example config does not exist: {path}. Have you run `primaite setup`?"
_LOGGER.error(msg)
raise FileNotFoundError(msg)
return path

View File

@@ -49,7 +49,7 @@ class AbstractAction(ABC):
objects."""
@abstractmethod
def form_request(self) -> List[str]:
def form_request(self) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return []
@@ -67,7 +67,7 @@ class DoNothingAction(AbstractAction):
# i.e. a choice between one option. To make enumerating this action easier, we are adding a 'dummy' paramter
# with one option. This just aids the Action Manager to enumerate all possibilities.
def form_request(self, **kwargs) -> List[str]:
def form_request(self, **kwargs) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return ["do_nothing"]
@@ -86,7 +86,7 @@ class NodeServiceAbstractAction(AbstractAction):
self.shape: Dict[str, int] = {"node_id": num_nodes, "service_id": num_services}
self.verb: str # define but don't initialise: defends against children classes not defining this
def form_request(self, node_id: int, service_id: int) -> List[str]:
def form_request(self, node_id: int, service_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
service_name = self.manager.get_service_name_by_idx(node_id, service_id)
@@ -181,7 +181,7 @@ class NodeApplicationAbstractAction(AbstractAction):
self.shape: Dict[str, int] = {"node_id": num_nodes, "application_id": num_applications}
self.verb: str # define but don't initialise: defends against children classes not defining this
def form_request(self, node_id: int, application_id: int) -> List[str]:
def form_request(self, node_id: int, application_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
application_name = self.manager.get_application_name_by_idx(node_id, application_id)
@@ -229,7 +229,7 @@ class NodeApplicationInstallAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes}
def form_request(self, node_id: int, application_name: str) -> List[str]:
def form_request(self, node_id: int, application_name: str) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
@@ -324,7 +324,7 @@ class NodeApplicationRemoveAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes}
def form_request(self, node_id: int, application_name: str) -> List[str]:
def form_request(self, node_id: int, application_name: str) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
@@ -346,7 +346,7 @@ class NodeFolderAbstractAction(AbstractAction):
self.shape: Dict[str, int] = {"node_id": num_nodes, "folder_id": num_folders}
self.verb: str # define but don't initialise: defends against children classes not defining this
def form_request(self, node_id: int, folder_id: int) -> List[str]:
def form_request(self, node_id: int, folder_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
folder_name = self.manager.get_folder_name_by_idx(node_idx=node_id, folder_idx=folder_id)
@@ -394,7 +394,9 @@ class NodeFileCreateAction(AbstractAction):
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, **kwargs)
self.verb: str = "create"
def form_request(self, node_id: int, folder_name: str, file_name: str, force: Optional[bool] = False) -> List[str]:
def form_request(
self, node_id: int, folder_name: str, file_name: str, force: Optional[bool] = False
) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None or folder_name is None or file_name is None:
@@ -409,7 +411,7 @@ class NodeFolderCreateAction(AbstractAction):
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, **kwargs)
self.verb: str = "create"
def form_request(self, node_id: int, folder_name: str) -> List[str]:
def form_request(self, node_id: int, folder_name: str) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None or folder_name is None:
@@ -430,7 +432,7 @@ class NodeFileAbstractAction(AbstractAction):
self.shape: Dict[str, int] = {"node_id": num_nodes, "folder_id": num_folders, "file_id": num_files}
self.verb: str # define but don't initialise: defends against children classes not defining this
def form_request(self, node_id: int, folder_id: int, file_id: int) -> List[str]:
def form_request(self, node_id: int, folder_id: int, file_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
folder_name = self.manager.get_folder_name_by_idx(node_idx=node_id, folder_idx=folder_id)
@@ -463,7 +465,7 @@ class NodeFileDeleteAction(NodeFileAbstractAction):
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, num_files=num_files, **kwargs)
self.verb: str = "delete"
def form_request(self, node_id: int, folder_id: int, file_id: int) -> List[str]:
def form_request(self, node_id: int, folder_id: int, file_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
folder_name = self.manager.get_folder_name_by_idx(node_idx=node_id, folder_idx=folder_id)
@@ -504,7 +506,7 @@ class NodeFileAccessAction(AbstractAction):
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, **kwargs)
self.verb: str = "access"
def form_request(self, node_id: int, folder_name: str, file_name: str) -> List[str]:
def form_request(self, node_id: int, folder_name: str, file_name: str) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None or folder_name is None or file_name is None:
@@ -525,7 +527,7 @@ class NodeAbstractAction(AbstractAction):
self.shape: Dict[str, int] = {"node_id": num_nodes}
self.verb: str # define but don't initialise: defends against children classes not defining this
def form_request(self, node_id: int) -> List[str]:
def form_request(self, node_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
return ["network", "node", node_name, self.verb]
@@ -740,7 +742,7 @@ class RouterACLRemoveRuleAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"position": max_acl_rules}
def form_request(self, target_router: str, position: int) -> List[str]:
def form_request(self, target_router: str, position: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return ["network", "node", target_router, "acl", "remove_rule", position]
@@ -923,7 +925,7 @@ class HostNICAbstractAction(AbstractAction):
self.shape: Dict[str, int] = {"node_id": num_nodes, "nic_id": max_nics_per_node}
self.verb: str # define but don't initialise: defends against children classes not defining this
def form_request(self, node_id: int, nic_id: int) -> List[str]:
def form_request(self, node_id: int, nic_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_idx=node_id)
nic_num = self.manager.get_nic_num_by_idx(node_idx=node_id, nic_idx=nic_id)
@@ -960,7 +962,7 @@ class NetworkPortEnableAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"port_id": max_nics_per_node}
def form_request(self, target_nodename: str, port_id: int) -> List[str]:
def form_request(self, target_nodename: str, port_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if target_nodename is None or port_id is None:
return ["do_nothing"]
@@ -979,7 +981,7 @@ class NetworkPortDisableAction(AbstractAction):
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"port_id": max_nics_per_node}
def form_request(self, target_nodename: str, port_id: int) -> List[str]:
def form_request(self, target_nodename: str, port_id: int) -> RequestFormat:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if target_nodename is None or port_id is None:
return ["do_nothing"]
@@ -1315,7 +1317,7 @@ class ActionManager:
act_identifier, act_options = self.action_map[action]
return act_identifier, act_options
def form_request(self, action_identifier: str, action_options: Dict) -> List[str]:
def form_request(self, action_identifier: str, action_options: Dict) -> RequestFormat:
"""Take action in CAOS format and use the execution definition to change it into PrimAITE request format."""
act_obj = self.actions[action_identifier]
return act_obj.form_request(**action_options)

View File

@@ -0,0 +1,188 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import logging
from pathlib import Path
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator import LogLevel, SIM_OUTPUT
class _NotJSONFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
"""
Determines if a log message does not start and end with '{' and '}' (i.e., it is not a JSON-like message).
:param record: LogRecord object containing all the information pertinent to the event being logged.
:return: True if log message is not JSON-like, False otherwise.
"""
return not record.getMessage().startswith("{") and not record.getMessage().endswith("}")
class AgentLog:
"""
A Agent Log class is a simple logger dedicated to managing and writing logging updates and information for an agent.
Each log message is written to a file located at: <simulation output directory>/agent_name/agent_name.log
"""
def __init__(self, agent_name: str):
"""
Constructs a Agent Log instance for a given hostname.
:param hostname: The hostname associated with the system logs being recorded.
"""
self.agent_name = agent_name
self.current_episode: int = 1
self.current_timestep: int = 0
self.setup_logger()
@property
def timestep(self) -> int:
"""Returns the current timestep. Used for log indexing.
:return: The current timestep as an Int.
"""
return self.current_timestep
def update_timestep(self, new_timestep: int):
"""
Updates the self.current_timestep attribute with the given parameter.
This method is called within .step() to ensure that all instances of Agent Logs
are in sync with one another.
:param new_timestep: The new timestep.
"""
self.current_timestep = new_timestep
def setup_logger(self):
"""
Configures the logger for this Agent Log instance.
The logger is set to the DEBUG level, and is equipped with a handler that writes to a file and filters out
JSON-like messages.
"""
if not SIM_OUTPUT.save_agent_logs:
return
log_path = self._get_log_path()
file_handler = logging.FileHandler(filename=log_path)
file_handler.setLevel(logging.DEBUG)
log_format = "%(timestep)s::%(levelname)s::%(message)s"
file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"{self.agent_name}_log")
for handler in self.logger.handlers:
self.logger.removeHandler(handler)
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
def _get_log_path(self) -> Path:
"""
Constructs the path for the log file based on the agent name.
:return: Path object representing the location of the log file.
"""
root = SIM_OUTPUT.agent_behaviour_path / f"episode_{self.current_episode}" / self.agent_name
root.mkdir(exist_ok=True, parents=True)
return root / f"{self.agent_name}.log"
def _write_to_terminal(self, msg: str, level: str, to_terminal: bool = False):
if to_terminal or SIM_OUTPUT.write_agent_log_to_terminal:
print(f"{self.agent_name}: ({ self.timestep}) ({level}) {msg}")
def debug(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the DEBUG level.
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.DEBUG:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.debug(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "DEBUG", to_terminal)
def info(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the INFO level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.INFO:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.info(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "INFO", to_terminal)
def warning(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the WARNING level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.WARNING:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.warning(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "WARNING", to_terminal)
def error(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the ERROR level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.ERROR:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.error(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "ERROR", to_terminal)
def critical(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the CRITICAL level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if LogLevel.CRITICAL < SIM_OUTPUT.agent_log_level:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.critical(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "CRITICAL", to_terminal)
def show(self, last_n: int = 10, markdown: bool = False):
"""
Print an Agents Log as a table.
Generate and print PrettyTable instance that shows the agents behaviour log, with columns Time step,
Level and Message.
:param markdown: Use Markdown style in table output. Defaults to False.
"""
table = PrettyTable(["Time Step", "Level", "Message"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.agent_name} Behaviour Log"
if self._get_log_path().exists():
with open(self._get_log_path()) as file:
lines = file.readlines()
for line in lines[-last_n:]:
table.add_row(line.strip().split("::"))
print(table)

View File

@@ -7,6 +7,7 @@ from gymnasium.core import ActType, ObsType
from pydantic import BaseModel, model_validator
from primaite.game.agent.actions import ActionManager
from primaite.game.agent.agent_log import AgentLog
from primaite.game.agent.observations.observation_manager import ObservationManager
from primaite.game.agent.rewards import RewardFunction
from primaite.interface.request import RequestFormat, RequestResponse
@@ -69,6 +70,8 @@ class AgentSettings(BaseModel):
"Configuration for when an agent begins performing it's actions"
flatten_obs: bool = True
"Whether to flatten the observation space before passing it to the agent. True by default."
action_masking: bool = False
"Whether to return action masks at each step."
@classmethod
def from_config(cls, config: Optional[Dict]) -> "AgentSettings":
@@ -116,6 +119,7 @@ class AbstractAgent(ABC):
self.reward_function: Optional[RewardFunction] = reward_function
self.agent_settings = agent_settings or AgentSettings()
self.history: List[AgentHistoryItem] = []
self.logger = AgentLog(agent_name)
def update_observation(self, state: Dict) -> ObsType:
"""
@@ -205,6 +209,7 @@ class ProxyAgent(AbstractAgent):
)
self.most_recent_action: ActType
self.flatten_obs: bool = agent_settings.flatten_obs if agent_settings else False
self.action_masking: bool = agent_settings.action_masking if agent_settings else False
def get_action(self, obs: ObsType, timestep: int = 0) -> Tuple[str, Dict]:
"""

View File

@@ -38,10 +38,11 @@ class DataManipulationAgent(AbstractScriptedAgent):
:rtype: Tuple[str, Dict]
"""
if timestep < self.next_execution_timestep:
self.logger.debug(msg="Performing do NOTHING")
return "DONOTHING", {}
self._set_next_execution_timestep(timestep + self.agent_settings.start_settings.frequency)
self.logger.info(msg="Performing a data manipulation attack!")
return "NODE_APPLICATION_EXECUTE", {"node_id": self.starting_node_idx, "application_id": 0}
def setup_agent(self) -> None:
@@ -54,3 +55,4 @@ class DataManipulationAgent(AbstractScriptedAgent):
# we are assuming that every node in the node manager has a data manipulation application at idx 0
num_nodes = len(self.action_manager.node_names)
self.starting_node_idx = random.randint(0, num_nodes - 1)
self.logger.debug(msg=f"Select Start Node ID: {self.starting_node_idx}")

View File

@@ -85,4 +85,5 @@ class ProbabilisticAgent(AbstractScriptedAgent):
:rtype: Tuple[str, Dict]
"""
choice = self.rng.choice(len(self.action_manager.action_map), p=self.probabilities)
self.logger.info(f"Performing Action: {choice}")
return self.action_manager.get_action(choice)

View File

@@ -3,6 +3,7 @@
from ipaddress import IPv4Address
from typing import Dict, List, Optional
import numpy as np
from pydantic import BaseModel, ConfigDict
from primaite import DEFAULT_BANDWIDTH, getLogger
@@ -15,6 +16,8 @@ from primaite.game.agent.scripted_agents.probabilistic_agent import Probabilisti
from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent
from primaite.game.agent.scripted_agents.tap001 import TAP001
from primaite.game.science import graph_has_cycle, topological_sort
from primaite.simulator import SIM_OUTPUT
from primaite.simulator.network.airspace import AirSpaceFrequency
from primaite.simulator.network.hardware.base import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
@@ -164,6 +167,8 @@ class PrimaiteGame:
for _, agent in self.agents.items():
obs = agent.observation_manager.current_observation
action_choice, parameters = agent.get_action(obs, timestep=self.step_counter)
if SIM_OUTPUT.save_agent_logs:
agent.logger.debug(f"Chosen Action: {action_choice}")
request = agent.format_request(action_choice, parameters)
response = self.simulation.apply_request(request)
agent.process_action_response(
@@ -182,8 +187,14 @@ class PrimaiteGame:
"""Advance timestep."""
self.step_counter += 1
_LOGGER.debug(f"Advancing timestep to {self.step_counter} ")
self.update_agent_loggers()
self.simulation.apply_timestep(self.step_counter)
def update_agent_loggers(self) -> None:
"""Updates Agent Loggers with new timestep."""
for agent in self.agents.values():
agent.logger.update_timestep(self.step_counter)
def calculate_truncated(self) -> bool:
"""Calculate whether the episode is truncated."""
current_step = self.step_counter
@@ -192,6 +203,23 @@ class PrimaiteGame:
return True
return False
def action_mask(self, agent_name: str) -> np.ndarray:
"""
Return the action mask for the agent.
This is a boolean list corresponding to the agent's action space. A False entry means this action cannot be
performed during this step.
:return: Action mask
:rtype: List[bool]
"""
agent = self.agents[agent_name]
mask = [True] * len(agent.action_manager.action_map)
for i, action in agent.action_manager.action_map.items():
request = agent.action_manager.form_request(action_identifier=action[0], action_options=action[1])
mask[i] = self.simulation._request_manager.check_valid(request, {})
return np.asarray(mask, dtype=np.int8)
def close(self) -> None:
"""Close the game, this will close the simulation."""
return NotImplemented
@@ -227,6 +255,12 @@ class PrimaiteGame:
simulation_config = cfg.get("simulation", {})
network_config = simulation_config.get("network", {})
airspace_cfg = network_config.get("airspace", {})
frequency_max_capacity_mbps_cfg = airspace_cfg.get("frequency_max_capacity_mbps", {})
frequency_max_capacity_mbps_cfg = {AirSpaceFrequency[k]: v for k, v in frequency_max_capacity_mbps_cfg.items()}
net.airspace.frequency_max_capacity_mbps_ = frequency_max_capacity_mbps_cfg
nodes_cfg = network_config.get("nodes", [])
links_cfg = network_config.get("links", [])

View File

@@ -0,0 +1,218 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Action Masking\n",
"\n",
"PrimAITE environments support action masking. The action mask shows which of the agent's actions are applicable with the current environment state. For example, a node can only be turned on if it is currently turned off."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from primaite.session.environment import PrimaiteGymEnv\n",
"from primaite.config.load import data_manipulation_config_path\n",
"from prettytable import PrettyTable\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"env = PrimaiteGymEnv(data_manipulation_config_path())\n",
"env.action_masking = True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The action mask is a list of booleans that specifies whether each action in the agent's action map is currently possible. Demonstrated here:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"act_table = PrettyTable((\"number\", \"action\", \"parameters\", \"mask\"))\n",
"mask = env.action_masks()\n",
"actions = env.agent.action_manager.action_map\n",
"max_str_len = 70\n",
"for act,mask in zip(actions.items(), mask):\n",
" act_num, act_data = act\n",
" act_type, act_params = act_data\n",
" act_params = s if len(s:=str(act_params))<max_str_len else f\"{s[:max_str_len-3]}...\"\n",
" act_table.add_row((act_num, act_type, act_params, mask))\n",
"print(act_table)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Action masking for Stable Baselines3 agents\n",
"SB3 agents automatically use the action_masks method during the training loop"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sb3_contrib import MaskablePPO\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model = MaskablePPO(\"MlpPolicy\", env, gamma=0.4, seed=32)\n",
"model.learn(1024)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Action masking for Ray RLLib agents\n",
"Ray uses a different API to obtain action masks, but this is handled by the PrimaiteRayEnv and PrimaiteRayMarlEnv classes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from primaite.session.ray_envs import PrimaiteRayEnv\n",
"from ray.rllib.algorithms.ppo import PPOConfig\n",
"import yaml\n",
"from ray import air, tune\n",
"from ray.rllib.examples.rl_modules.classes.action_masking_rlm import ActionMaskingTorchRLModule\n",
"from ray.rllib.core.rl_module.rl_module import SingleAgentRLModuleSpec\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(data_manipulation_config_path(), 'r') as f:\n",
" cfg = yaml.safe_load(f)\n",
"for agent in cfg['agents']:\n",
" if agent[\"ref\"] == \"defender\":\n",
" agent['agent_settings']['flatten_obs'] = True\n",
"env_config = cfg\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"config = (\n",
" PPOConfig()\n",
" .api_stack(enable_rl_module_and_learner=True, enable_env_runner_and_connector_v2=True)\n",
" .environment(env=PrimaiteRayEnv, env_config=cfg, action_mask_key=\"action_mask\")\n",
" .rl_module(rl_module_spec=SingleAgentRLModuleSpec(module_class = ActionMaskingTorchRLModule))\n",
" .env_runners(num_env_runners=0)\n",
" .training(train_batch_size=128)\n",
")\n",
"algo = config.build()\n",
"for i in range(2):\n",
" results = algo.train()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Action masking with MARL in Ray RLLib\n",
"Each agent has their own action mask, this is useful if the agents have different action spaces."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from ray.rllib.core.rl_module.marl_module import MultiAgentRLModuleSpec\n",
"from primaite.session.ray_envs import PrimaiteRayMARLEnv\n",
"from primaite.config.load import data_manipulation_marl_config_path"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(data_manipulation_marl_config_path(), 'r') as f:\n",
" cfg = yaml.safe_load(f)\n",
"env_config = cfg\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"config = (\n",
" PPOConfig()\n",
" .multi_agent(\n",
" policies={'defender_1','defender_2'}, # These names are the same as the agents defined in the example config.\n",
" policy_mapping_fn=lambda agent_id, *args, **kwargs: agent_id,\n",
" )\n",
" .api_stack(enable_rl_module_and_learner=True, enable_env_runner_and_connector_v2=True)\n",
" .environment(env=PrimaiteRayMARLEnv, env_config=cfg, action_mask_key=\"action_mask\")\n",
" .rl_module(rl_module_spec=MultiAgentRLModuleSpec(module_specs={\n",
" \"defender_1\":SingleAgentRLModuleSpec(module_class=ActionMaskingTorchRLModule),\n",
" \"defender_2\":SingleAgentRLModuleSpec(module_class=ActionMaskingTorchRLModule),\n",
" }))\n",
" .env_runners(num_env_runners=0)\n",
" .training(train_batch_size=128)\n",
")\n",
"algo = config.build()\n",
"for i in range(2):\n",
" results = algo.train()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -4,6 +4,7 @@ from os import PathLike
from typing import Any, Dict, Optional, SupportsFloat, Tuple, Union
import gymnasium
import numpy as np
from gymnasium.core import ActType, ObsType
from primaite import getLogger
@@ -41,6 +42,21 @@ class PrimaiteGymEnv(gymnasium.Env):
self.total_reward_per_episode: Dict[int, float] = {}
"""Average rewards of agents per episode."""
def action_masks(self) -> np.ndarray:
"""
Return the action mask for the agent.
This is a boolean list corresponding to the agent's action space. A False entry means this action cannot be
performed during this step.
:return: Action mask
:rtype: List[bool]
"""
if not self.agent.action_masking:
return np.asarray([True] * len(self.agent.action_manager.action_map))
else:
return self.game.action_mask(self._agent_name)
@property
def agent(self) -> ProxyAgent:
"""Grab a fresh reference to the agent object because it will be reinstantiated each episode."""

View File

@@ -35,10 +35,16 @@ class PrimaiteIO:
"""Whether to save PCAP logs."""
save_sys_logs: bool = True
"""Whether to save system logs."""
save_agent_logs: bool = True
"""Whether to save agent logs."""
write_sys_log_to_terminal: bool = False
"""Whether to write the sys log to the terminal."""
write_agent_log_to_terminal: bool = False
"""Whether to write the agent log to the terminal."""
sys_log_level: LogLevel = LogLevel.INFO
"""The level of log that should be included in the logfiles/logged into terminal."""
"""The level of sys logs that should be included in the logfiles/logged into terminal."""
agent_log_level: LogLevel = LogLevel.INFO
"""The level of agent logs that should be included in the logfiles/logged into terminal."""
def __init__(self, settings: Optional[Settings] = None) -> None:
"""
@@ -51,27 +57,31 @@ class PrimaiteIO:
self.session_path: Path = self.generate_session_path()
# set global SIM_OUTPUT path
SIM_OUTPUT.path = self.session_path / "simulation_output"
SIM_OUTPUT.agent_behaviour_path = self.session_path / "agent_behaviour"
SIM_OUTPUT.save_pcap_logs = self.settings.save_pcap_logs
SIM_OUTPUT.save_sys_logs = self.settings.save_sys_logs
SIM_OUTPUT.save_agent_logs = self.settings.save_agent_logs
SIM_OUTPUT.write_agent_log_to_terminal = self.settings.write_agent_log_to_terminal
SIM_OUTPUT.write_sys_log_to_terminal = self.settings.write_sys_log_to_terminal
SIM_OUTPUT.sys_log_level = self.settings.sys_log_level
SIM_OUTPUT.agent_log_level = self.settings.agent_log_level
def generate_session_path(self, timestamp: Optional[datetime] = None) -> Path:
"""Create a folder for the session and return the path to it."""
if timestamp is None:
timestamp = datetime.now()
date_str = timestamp.strftime("%Y-%m-%d")
time_str = timestamp.strftime("%H-%M-%S")
session_path = PRIMAITE_PATHS.user_sessions_path / date_str / time_str
session_path = PRIMAITE_PATHS.user_sessions_path / SIM_OUTPUT.date_str / SIM_OUTPUT.time_str
# check if running in dev mode
if is_dev_mode():
session_path = _PRIMAITE_ROOT.parent.parent / "sessions" / date_str / time_str
session_path = _PRIMAITE_ROOT.parent.parent / "sessions" / SIM_OUTPUT.date_str / SIM_OUTPUT.time_str
# check if there is an output directory set in config
if PRIMAITE_CONFIG["developer_mode"]["output_dir"]:
session_path = Path(PRIMAITE_CONFIG["developer_mode"]["output_dir"]) / "sessions" / date_str / time_str
session_path = (
Path(PRIMAITE_CONFIG["developer_mode"]["output_dir"])
/ "sessions"
/ SIM_OUTPUT.date_str
/ SIM_OUTPUT.time_str
)
session_path.mkdir(exist_ok=True, parents=True)
return session_path
@@ -115,6 +125,9 @@ class PrimaiteIO:
if config.get("sys_log_level"):
config["sys_log_level"] = LogLevel[config["sys_log_level"].upper()] # convert to enum
if config.get("agent_log_level"):
config["agent_log_level"] = LogLevel[config["agent_log_level"].upper()] # convert to enum
new = cls(settings=cls.Settings(**config))
return new

View File

@@ -3,6 +3,7 @@ import json
from typing import Dict, SupportsFloat, Tuple
import gymnasium
from gymnasium import spaces
from gymnasium.core import ActType, ObsType
from ray.rllib.env.multi_agent_env import MultiAgentEnv
@@ -38,15 +39,19 @@ class PrimaiteRayMARLEnv(MultiAgentEnv):
self.terminateds = set()
self.truncateds = set()
self.observation_space = gymnasium.spaces.Dict(
{
name: gymnasium.spaces.flatten_space(agent.observation_manager.space)
for name, agent in self.agents.items()
}
)
self.action_space = gymnasium.spaces.Dict(
{name: agent.action_manager.space for name, agent in self.agents.items()}
self.observation_space = spaces.Dict(
{name: spaces.flatten_space(agent.observation_manager.space) for name, agent in self.agents.items()}
)
for agent_name in self._agent_ids:
agent = self.game.rl_agents[agent_name]
if agent.action_masking:
self.observation_space[agent_name] = spaces.Dict(
{
"action_mask": spaces.MultiBinary(agent.action_manager.space.n),
"observations": self.observation_space[agent_name],
}
)
self.action_space = spaces.Dict({name: agent.action_manager.space for name, agent in self.agents.items()})
self._obs_space_in_preferred_format = True
self._action_space_in_preferred_format = True
super().__init__()
@@ -131,13 +136,17 @@ class PrimaiteRayMARLEnv(MultiAgentEnv):
def _get_obs(self) -> Dict[str, ObsType]:
"""Return the current observation."""
obs = {}
all_obs = {}
for agent_name in self._agent_ids:
agent = self.game.rl_agents[agent_name]
unflat_space = agent.observation_manager.space
unflat_obs = agent.observation_manager.current_observation
obs[agent_name] = gymnasium.spaces.flatten(unflat_space, unflat_obs)
return obs
obs = gymnasium.spaces.flatten(unflat_space, unflat_obs)
if agent.action_masking:
all_obs[agent_name] = {"action_mask": self.game.action_mask(agent_name), "observations": obs}
else:
all_obs[agent_name] = obs
return all_obs
def close(self):
"""Close the simulation."""
@@ -158,15 +167,30 @@ class PrimaiteRayEnv(gymnasium.Env):
self.env = PrimaiteGymEnv(env_config=env_config)
# self.env.episode_counter -= 1
self.action_space = self.env.action_space
self.observation_space = self.env.observation_space
if self.env.agent.action_masking:
self.observation_space = spaces.Dict(
{"action_mask": spaces.MultiBinary(self.env.action_space.n), "observations": self.env.observation_space}
)
else:
self.observation_space = self.env.observation_space
def reset(self, *, seed: int = None, options: dict = None) -> Tuple[ObsType, Dict]:
"""Reset the environment."""
if self.env.agent.action_masking:
obs, *_ = self.env.reset(seed=seed)
new_obs = {"action_mask": self.env.action_masks(), "observations": obs}
return new_obs, *_
return self.env.reset(seed=seed)
def step(self, action: ActType) -> Tuple[ObsType, SupportsFloat, bool, bool, Dict]:
"""Perform a step in the environment."""
return self.env.step(action)
# if action masking is enabled, intercept the step method and add action mask to observation
if self.env.agent.action_masking:
obs, *_ = self.env.step(action)
new_obs = {"action_mask": self.game.action_mask(self.env._agent_name), "observations": obs}
return new_obs, *_
else:
return self.env.step(action)
def close(self):
"""Close the simulation."""

View File

@@ -3,6 +3,8 @@
developer_mode:
enabled: False # not enabled by default
sys_log_level: DEBUG # level of output for system logs, DEBUG by default
agent_log_level: DEBUG # level of output for agent logs, DEBUG by default
output_agent_logs: False # level of output for system logs, DEBUG by default
output_sys_logs: False # system logs not output by default
output_pcap_logs: False # pcap logs not output by default
output_to_terminal: False # do not output to terminal by default

View File

@@ -34,10 +34,14 @@ class _SimOutput:
path = PRIMAITE_PATHS.user_sessions_path / self.date_str / self.time_str
self._path = path
self._agent_behaviour_path = path
self._save_pcap_logs: bool = False
self._save_sys_logs: bool = False
self._save_agent_logs: bool = False
self._write_sys_log_to_terminal: bool = False
self._write_agent_log_to_terminal: bool = False
self._sys_log_level: LogLevel = LogLevel.WARNING # default log level is at WARNING
self._agent_log_level: LogLevel = LogLevel.WARNING
@property
def path(self) -> Path:
@@ -61,6 +65,28 @@ class _SimOutput:
self._path = new_path
self._path.mkdir(exist_ok=True, parents=True)
@property
def agent_behaviour_path(self) -> Path:
if is_dev_mode():
# if dev mode is enabled, if output dir is not set, print to primaite repo root
path: Path = _PRIMAITE_ROOT.parent.parent / "sessions" / self.date_str / self.time_str / "agent_behaviour"
# otherwise print to output dir
if PRIMAITE_CONFIG["developer_mode"]["output_dir"]:
path: Path = (
Path(PRIMAITE_CONFIG["developer_mode"]["output_dir"])
/ "sessions"
/ self.date_str
/ self.time_str
/ "agent_behaviour"
)
self._agent_behaviour_path = path
return self._agent_behaviour_path
@agent_behaviour_path.setter
def agent_behaviour_path(self, new_path: Path) -> None:
self._agent_behaviour_path = new_path
self._agent_behaviour_path.mkdir(exist_ok=True, parents=True)
@property
def save_pcap_logs(self) -> bool:
if is_dev_mode():
@@ -81,6 +107,16 @@ class _SimOutput:
def save_sys_logs(self, save_sys_logs: bool) -> None:
self._save_sys_logs = save_sys_logs
@property
def save_agent_logs(self) -> bool:
if is_dev_mode():
return PRIMAITE_CONFIG.get("developer_mode").get("output_agent_logs")
return self._save_agent_logs
@save_agent_logs.setter
def save_agent_logs(self, save_agent_logs: bool) -> None:
self._save_agent_logs = save_agent_logs
@property
def write_sys_log_to_terminal(self) -> bool:
if is_dev_mode():
@@ -91,6 +127,17 @@ class _SimOutput:
def write_sys_log_to_terminal(self, write_sys_log_to_terminal: bool) -> None:
self._write_sys_log_to_terminal = write_sys_log_to_terminal
# Should this be separate from sys_log?
@property
def write_agent_log_to_terminal(self) -> bool:
if is_dev_mode():
return PRIMAITE_CONFIG.get("developer_mode").get("output_to_terminal")
return self._write_agent_log_to_terminal
@write_agent_log_to_terminal.setter
def write_agent_log_to_terminal(self, write_agent_log_to_terminal: bool) -> None:
self._write_agent_log_to_terminal = write_agent_log_to_terminal
@property
def sys_log_level(self) -> LogLevel:
if is_dev_mode():
@@ -101,5 +148,15 @@ class _SimOutput:
def sys_log_level(self, sys_log_level: LogLevel) -> None:
self._sys_log_level = sys_log_level
@property
def agent_log_level(self) -> LogLevel:
if is_dev_mode():
return LogLevel[PRIMAITE_CONFIG.get("developer_mode").get("agent_log_level")]
return self._agent_log_level
@agent_log_level.setter
def agent_log_level(self, agent_log_level: LogLevel) -> None:
self._agent_log_level = agent_log_level
SIM_OUTPUT = _SimOutput()

View File

@@ -3,9 +3,10 @@
"""Core of the PrimAITE Simulator."""
import warnings
from abc import abstractmethod
from typing import Callable, Dict, List, Literal, Optional, Union
from typing import Callable, Dict, Iterable, List, Literal, Optional, Tuple, Union
from uuid import uuid4
from prettytable import PrettyTable
from pydantic import BaseModel, ConfigDict, Field, validate_call
from primaite import getLogger
@@ -34,6 +35,20 @@ class RequestPermissionValidator(BaseModel):
"""Message that is reported when a request is rejected by this validator."""
return "request rejected"
def __add__(self, other: "RequestPermissionValidator") -> "_CombinedValidator":
return _CombinedValidator(validators=[self, other])
class _CombinedValidator(RequestPermissionValidator):
validators: List[RequestPermissionValidator] = []
def __call__(self, request, context) -> bool:
return all(x(request, context) for x in self.validators)
@property
def fail_message(self):
return f"One of the following conditions are not met: {[v.fail_message for v in self.validators]}"
class AllowAllValidator(RequestPermissionValidator):
"""Always allows the request."""
@@ -150,8 +165,17 @@ class RequestManager(BaseModel):
self.request_types.pop(name)
def get_request_types_recursively(self) -> List[List[str]]:
"""Recursively generate request tree for this component."""
def get_request_types_recursively(self) -> List[RequestFormat]:
"""
Recursively generate request tree for this component.
:param parent_valid: Whether this sub-request's parent request was valid. This value should not be specified by
users, it is used by the recursive call.
:type parent_valid: bool
:returns: A list of tuples where the first tuple element is the request string and the second is whether that
request is currently possible to execute.
:rtype: List[Tuple[RequestFormat, bool]]
"""
requests = []
for req_name, req in self.request_types.items():
if isinstance(req.func, RequestManager):
@@ -162,6 +186,30 @@ class RequestManager(BaseModel):
requests.append([req_name])
return requests
def show(self) -> None:
"""Display all currently available requests."""
table = PrettyTable(["requests"])
table.align = "l"
table.add_rows([[x] for x in self.get_request_types_recursively()])
print(table)
def check_valid(self, request: RequestFormat, context: Dict) -> bool:
"""Check if this request would be valid in the current state of the simulation without invoking it."""
request_key = request[0]
request_options = request[1:]
if request_key not in self.request_types:
return False
request_type = self.request_types[request_key]
# recurse if we are not at a leaf node
if isinstance(request_type.func, RequestManager):
return request_type.func.check_valid(request_options, context)
return request_type.validator(request_options, context)
class SimComponent(BaseModel):
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""

View File

@@ -52,6 +52,8 @@ class GroupMembershipValidator(RequestPermissionValidator):
def __call__(self, request: List[str], context: Dict) -> bool:
"""Permit the action if the request comes from an account which belongs to the right group."""
# if context request source is part of any groups mentioned in self.allow_groups, return true, otherwise false
if not context:
return False
requestor_groups: List[str] = context["request_source"]["groups"]
for allowed_group in self.allowed_groups:
if allowed_group.name in requestor_groups:

View File

@@ -6,8 +6,8 @@ from typing import Any, Dict, List, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType, SimComponent
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.file_system.folder import Folder
@@ -42,6 +42,10 @@ class FileSystem(SimComponent):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
self._folder_exists = FileSystem._FolderExistsValidator(file_system=self)
self._folder_not_deleted = FileSystem._FolderNotDeletedValidator(file_system=self)
self._file_exists = FileSystem._FileExistsValidator(file_system=self)
rm = super()._init_request_manager()
self._delete_manager = RequestManager()
@@ -50,13 +54,15 @@ class FileSystem(SimComponent):
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.delete_file(folder_name=request[0], file_name=request[1])
)
),
validator=self._file_exists,
),
)
self._delete_manager.add_request(
name="folder",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0]))
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0])),
validator=self._folder_exists,
),
)
rm.add_request(
@@ -144,10 +150,13 @@ class FileSystem(SimComponent):
)
self._folder_request_manager = RequestManager()
rm.add_request("folder", RequestType(func=self._folder_request_manager))
rm.add_request(
"folder",
RequestType(func=self._folder_request_manager, validator=self._folder_exists + self._folder_not_deleted),
)
self._file_request_manager = RequestManager()
rm.add_request("file", RequestType(func=self._file_request_manager))
rm.add_request("file", RequestType(func=self._file_request_manager, validator=self._file_exists))
return rm
@@ -626,3 +635,62 @@ class FileSystem(SimComponent):
self.sys_log.error(f"Unable to access file that does not exist. (file name: {file_name})")
return False
class _FolderExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the Folder exists.
Actions cannot be performed on a non-existent folder.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if folder exists."""
return self.file_system.get_folder(folder_name=request[0]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on folder because it does not exist."
class _FolderNotDeletedValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the Folder has not been deleted.
Actions cannot be performed on a deleted folder.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if folder exists and is not deleted."""
# get folder
folder = self.file_system.get_folder(folder_name=request[0], include_deleted=True)
return folder is not None and not folder.deleted
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on folder because it is deleted."
class _FileExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File exists.
Actions cannot be performed on a non-existent file.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists."""
return self.file_system.get_file(folder_name=request[0], file_name=request[1]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on a file that does not exist."

View File

@@ -185,5 +185,5 @@ file_type_sizes_bytes = {
FileType.ZIP: 1024000,
FileType.TAR: 1024000,
FileType.GZ: 819200,
FileType.DB: 15360000,
FileType.DB: 5_000_000,
}

View File

@@ -6,8 +6,8 @@ from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
@@ -55,6 +55,9 @@ class Folder(FileSystemItemABC):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
self._file_exists = Folder._FileExistsValidator(folder=self)
self._file_not_deleted = Folder._FileNotDeletedValidator(folder=self)
rm = super()._init_request_manager()
rm.add_request(
name="delete",
@@ -65,7 +68,9 @@ class Folder(FileSystemItemABC):
self._file_request_manager = RequestManager()
rm.add_request(
name="file",
request_type=RequestType(func=self._file_request_manager),
request_type=RequestType(
func=self._file_request_manager, validator=self._file_exists + self._file_not_deleted
),
)
return rm
@@ -469,3 +474,42 @@ class Folder(FileSystemItemABC):
self.deleted = True
return True
class _FileExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File exists.
Actions cannot be performed on a non-existent file.
"""
folder: Folder
"""Save a reference to the Folder instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists."""
return self.folder.get_file(file_name=request[0]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on a file that does not exist."
class _FileNotDeletedValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File is not deleted.
Actions cannot be performed on a deleted file.
"""
folder: Folder
"""Save a reference to the Folder instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists and is not deleted."""
file = self.folder.get_file(file_name=request[0])
return file is not None and not file.deleted
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on a file that is deleted."

View File

@@ -3,9 +3,10 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from prettytable import PrettyTable
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field
from primaite import getLogger
from primaite.simulator.network.hardware.base import Layer3Interface, NetworkInterface, WiredNetworkInterface
@@ -15,90 +16,29 @@ from primaite.simulator.system.core.packet_capture import PacketCapture
_LOGGER = getLogger(__name__)
__all__ = ["AirSpaceFrequency", "WirelessNetworkInterface", "IPWirelessNetworkInterface"]
def format_hertz(hertz: float, format_terahertz: bool = False, decimals: int = 3) -> str:
"""
Convert a frequency in Hertz to a formatted string using the most appropriate unit.
class AirSpace:
"""Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission."""
Optionally includes formatting for Terahertz.
def __init__(self):
self._wireless_interfaces: Dict[str, WirelessNetworkInterface] = {}
self._wireless_interfaces_by_frequency: Dict[AirSpaceFrequency, List[WirelessNetworkInterface]] = {}
def show(self, frequency: Optional[AirSpaceFrequency] = None):
"""
Displays a summary of wireless interfaces in the airspace, optionally filtered by a specific frequency.
:param frequency: The frequency band to filter devices by. If None, devices for all frequencies are shown.
"""
table = PrettyTable()
table.field_names = ["Connected Node", "MAC Address", "IP Address", "Subnet Mask", "Frequency", "Status"]
# If a specific frequency is provided, filter by it; otherwise, use all frequencies.
frequencies_to_show = [frequency] if frequency else self._wireless_interfaces_by_frequency.keys()
for freq in frequencies_to_show:
interfaces = self._wireless_interfaces_by_frequency.get(freq, [])
for interface in interfaces:
status = "Enabled" if interface.enabled else "Disabled"
table.add_row(
[
interface._connected_node.hostname, # noqa
interface.mac_address,
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
str(freq),
status,
]
)
print(table)
def add_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Adds a wireless network interface to the airspace if it's not already present.
:param wireless_interface: The wireless network interface to be added.
"""
if wireless_interface.mac_address not in self._wireless_interfaces:
self._wireless_interfaces[wireless_interface.mac_address] = wireless_interface
if wireless_interface.frequency not in self._wireless_interfaces_by_frequency:
self._wireless_interfaces_by_frequency[wireless_interface.frequency] = []
self._wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Removes a wireless network interface from the airspace if it's present.
:param wireless_interface: The wireless network interface to be removed.
"""
if wireless_interface.mac_address in self._wireless_interfaces:
self._wireless_interfaces.pop(wireless_interface.mac_address)
self._wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
def clear(self):
"""
Clears all wireless network interfaces and their frequency associations from the airspace.
After calling this method, the airspace will contain no wireless network interfaces, and transmissions cannot
occur until new interfaces are added again.
"""
self._wireless_interfaces.clear()
self._wireless_interfaces_by_frequency.clear()
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
"""
Transmits a frame to all enabled wireless network interfaces on a specific frequency within the airspace.
This ensures that a wireless interface does not receive its own transmission.
:param frame: The frame to be transmitted.
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
excluded from the list of receivers to prevent it from receiving its own transmission.
"""
for wireless_interface in self._wireless_interfaces_by_frequency.get(sender_network_interface.frequency, []):
if wireless_interface != sender_network_interface and wireless_interface.enabled:
wireless_interface.receive_frame(frame)
:param hertz: Frequency in Hertz.
:param format_terahertz: Whether to format frequency in Terahertz, default is False.
:param decimals: Number of decimal places to round to, default is 3.
:returns: Formatted string with the frequency in the most suitable unit.
"""
format_str = f"{{:.{decimals}f}}"
if format_terahertz and hertz >= 1e12: # Terahertz
return format_str.format(hertz / 1e12) + " THz"
elif hertz >= 1e9: # Gigahertz
return format_str.format(hertz / 1e9) + " GHz"
elif hertz >= 1e6: # Megahertz
return format_str.format(hertz / 1e6) + " MHz"
elif hertz >= 1e3: # Kilohertz
return format_str.format(hertz / 1e3) + " kHz"
else: # Hertz
return format_str.format(hertz) + " Hz"
class AirSpaceFrequency(Enum):
@@ -110,12 +50,231 @@ class AirSpaceFrequency(Enum):
"""WiFi 5 GHz. Known for its higher data transmission speeds and reduced interference from other devices."""
def __str__(self) -> str:
hertz_str = format_hertz(hertz=self.value)
if self == AirSpaceFrequency.WIFI_2_4:
return "WiFi 2.4 GHz"
elif self == AirSpaceFrequency.WIFI_5:
return "WiFi 5 GHz"
else:
return "Unknown Frequency"
return f"WiFi {hertz_str}"
if self == AirSpaceFrequency.WIFI_5:
return f"WiFi {hertz_str}"
return "Unknown Frequency"
@property
def maximum_data_rate_bps(self) -> float:
"""
Retrieves the maximum data transmission rate in bits per second (bps) for the frequency.
The maximum rates are predefined for known frequencies:
- For WIFI_2_4, it returns 100,000,000 bps (100 Mbps).
- For WIFI_5, it returns 500,000,000 bps (500 Mbps).
:return: The maximum data rate in bits per second. If the frequency is not recognized, returns 0.0.
"""
if self == AirSpaceFrequency.WIFI_2_4:
return 100_000_000.0 # 100 Megabits per second
if self == AirSpaceFrequency.WIFI_5:
return 500_000_000.0 # 500 Megabits per second
return 0.0
@property
def maximum_data_rate_mbps(self) -> float:
"""
Retrieves the maximum data transmission rate in megabits per second (Mbps).
This is derived by converting the maximum data rate from bits per second, as defined
in `maximum_data_rate_bps`, to megabits per second.
:return: The maximum data rate in megabits per second.
"""
return self.maximum_data_rate_bps / 1_000_000.0
class AirSpace(BaseModel):
"""
Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission.
This class provides functionalities to manage a collection of wireless network interfaces, each associated with
specific frequencies. It includes methods to add and remove wireless interfaces, and handle data transmission
across these interfaces.
"""
wireless_interfaces: Dict[str, WirelessNetworkInterface] = Field(default_factory=lambda: {})
wireless_interfaces_by_frequency: Dict[AirSpaceFrequency, List[WirelessNetworkInterface]] = Field(
default_factory=lambda: {}
)
bandwidth_load: Dict[AirSpaceFrequency, float] = Field(default_factory=lambda: {})
frequency_max_capacity_mbps_: Dict[AirSpaceFrequency, float] = Field(default_factory=lambda: {})
def get_frequency_max_capacity_mbps(self, frequency: AirSpaceFrequency) -> float:
"""
Retrieves the maximum data transmission capacity for a specified frequency.
This method checks a dictionary holding custom maximum capacities. If the frequency is found, it returns the
custom set maximum capacity. If the frequency is not found in the dictionary, it defaults to the standard
maximum data rate associated with that frequency.
:param frequency: The frequency for which the maximum capacity is queried.
:return: The maximum capacity in Mbps for the specified frequency.
"""
if frequency in self.frequency_max_capacity_mbps_:
return self.frequency_max_capacity_mbps_[frequency]
return frequency.maximum_data_rate_mbps
def set_frequency_max_capacity_mbps(self, cfg: Dict[AirSpaceFrequency, float]):
"""
Sets custom maximum data transmission capacities for multiple frequencies.
:param cfg: A dictionary mapping frequencies to their new maximum capacities in Mbps.
"""
self.frequency_max_capacity_mbps_ = cfg
for freq, mbps in cfg.items():
print(f"Overriding {freq} max capacity as {mbps:.3f} mbps")
def show_bandwidth_load(self, markdown: bool = False):
"""
Prints a table of the current bandwidth load for each frequency on the airspace.
This method prints a tabulated view showing the utilisation of available bandwidth capacities for all
frequencies. The table includes the current capacity usage as a percentage of the maximum capacity, alongside
the absolute maximum capacity values in Mbps.
:param markdown: Flag indicating if output should be in markdown format.
"""
headers = ["Frequency", "Current Capacity (%)", "Maximum Capacity (Mbit)"]
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "Airspace Frequency Channel Loads"
for frequency, load in self.bandwidth_load.items():
maximum_capacity = self.get_frequency_max_capacity_mbps(frequency)
load_percent = load / maximum_capacity if maximum_capacity > 0 else 0.0
if load_percent > 1.0:
load_percent = 1.0
table.add_row([format_hertz(frequency.value), f"{load_percent:.0%}", f"{maximum_capacity:.3f}"])
print(table)
def show_wireless_interfaces(self, markdown: bool = False):
"""
Prints a table of wireless interfaces in the airspace.
:param markdown: Flag indicating if output should be in markdown format.
"""
headers = [
"Connected Node",
"MAC Address",
"IP Address",
"Subnet Mask",
"Frequency",
"Speed (Mbps)",
"Status",
]
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "Devices on Air Space"
for interface in self.wireless_interfaces.values():
status = "Enabled" if interface.enabled else "Disabled"
table.add_row(
[
interface._connected_node.hostname, # noqa
interface.mac_address,
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
format_hertz(interface.frequency.value),
f"{interface.speed:.3f}",
status,
]
)
print(table.get_string(sortby="Frequency"))
def show(self, markdown: bool = False):
"""
Prints a summary of the current state of the airspace, including both wireless interfaces and bandwidth loads.
This method is a convenient wrapper that calls two separate methods to display detailed tables: one for
wireless interfaces and another for bandwidth load across all frequencies managed within the airspace. It
provides a holistic view of the operational status and performance metrics of the airspace.
:param markdown: Flag indicating if output should be in markdown format.
"""
self.show_wireless_interfaces(markdown)
self.show_bandwidth_load(markdown)
def add_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Adds a wireless network interface to the airspace if it's not already present.
:param wireless_interface: The wireless network interface to be added.
"""
if wireless_interface.mac_address not in self.wireless_interfaces:
self.wireless_interfaces[wireless_interface.mac_address] = wireless_interface
if wireless_interface.frequency not in self.wireless_interfaces_by_frequency:
self.wireless_interfaces_by_frequency[wireless_interface.frequency] = []
self.wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Removes a wireless network interface from the airspace if it's present.
:param wireless_interface: The wireless network interface to be removed.
"""
if wireless_interface.mac_address in self.wireless_interfaces:
self.wireless_interfaces.pop(wireless_interface.mac_address)
self.wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
def clear(self):
"""
Clears all wireless network interfaces and their frequency associations from the airspace.
After calling this method, the airspace will contain no wireless network interfaces, and transmissions cannot
occur until new interfaces are added again.
"""
self.wireless_interfaces.clear()
self.wireless_interfaces_by_frequency.clear()
def reset_bandwidth_load(self):
"""
Resets the bandwidth load tracking for all frequencies in the airspace.
This method clears the current load metrics for all operating frequencies, effectively setting the load to zero.
"""
self.bandwidth_load = {}
def can_transmit_frame(self, frame: Frame, sender_network_interface: WirelessNetworkInterface) -> bool:
"""
Determines if a frame can be transmitted by the sender network interface based on the current bandwidth load.
This method checks if adding the size of the frame to the current bandwidth load of the frequency used by the
sender network interface would exceed the maximum allowed bandwidth for that frequency. It returns True if the
frame can be transmitted without exceeding the limit, and False otherwise.
:param frame: The frame to be transmitted, used to check its size against the frequency's bandwidth limit.
:param sender_network_interface: The network interface attempting to transmit the frame, used to determine the
relevant frequency and its current bandwidth load.
:return: True if the frame can be transmitted within the bandwidth limit, False if it would exceed the limit.
"""
if sender_network_interface.frequency not in self.bandwidth_load:
self.bandwidth_load[sender_network_interface.frequency] = 0.0
return self.bandwidth_load[
sender_network_interface.frequency
] + frame.size_Mbits <= self.get_frequency_max_capacity_mbps(sender_network_interface.frequency)
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
"""
Transmits a frame to all enabled wireless network interfaces on a specific frequency within the airspace.
This ensures that a wireless interface does not receive its own transmission.
:param frame: The frame to be transmitted.
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
excluded from the list of receivers to prevent it from receiving its own transmission.
"""
self.bandwidth_load[sender_network_interface.frequency] += frame.size_Mbits
for wireless_interface in self.wireless_interfaces_by_frequency.get(sender_network_interface.frequency, []):
if wireless_interface != sender_network_interface and wireless_interface.enabled:
wireless_interface.receive_frame(frame)
class WirelessNetworkInterface(NetworkInterface, ABC):
@@ -185,13 +344,18 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
:param frame: The network frame to be sent.
:return: True if the frame is sent successfully, False if the network interface is disabled.
"""
if self.enabled:
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self.airspace.transmit(frame, self)
return True
# Cannot send Frame as the network interface is not enabled
return False
if not self.enabled:
return False
if not self.airspace.can_transmit_frame(frame, self):
# Drop frame for now. Queuing will happen here (probably) if it's done in the future.
self._connected_node.sys_log.info(f"{self}: Frame dropped as Link is at capacity")
return False
super().send_frame(frame)
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self.airspace.transmit(frame, self)
return True
def receive_frame(self, frame: Frame) -> bool:
"""

View File

@@ -96,6 +96,8 @@ class Network(SimComponent):
"""Apply pre-timestep logic."""
super().pre_timestep(timestep)
self.airspace.reset_bandwidth_load()
for node in self.nodes.values():
node.pre_timestep(timestep)

View File

@@ -89,7 +89,7 @@ class NetworkInterface(SimComponent, ABC):
mac_address: str = Field(default_factory=generate_mac_address)
"The MAC address of the interface."
speed: int = 100
speed: float = 100.0
"The speed of the interface in Mbps. Default is 100 Mbps."
mtu: int = 1500
@@ -132,10 +132,25 @@ class NetworkInterface(SimComponent, ABC):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
_is_network_interface_enabled = NetworkInterface._EnabledValidator(network_interface=self)
_is_network_interface_disabled = NetworkInterface._DisabledValidator(network_interface=self)
rm = super()._init_request_manager()
rm.add_request("enable", RequestType(func=lambda request, context: RequestResponse.from_bool(self.enable())))
rm.add_request("disable", RequestType(func=lambda request, context: RequestResponse.from_bool(self.disable())))
rm.add_request(
"enable",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.enable()),
validator=_is_network_interface_disabled,
),
)
rm.add_request(
"disable",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.disable()),
validator=_is_network_interface_enabled,
),
)
return rm
@@ -334,6 +349,50 @@ class NetworkInterface(SimComponent, ABC):
super().pre_timestep(timestep)
self.traffic = {}
class _EnabledValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the NetworkInterface is enabled.
This is useful because most actions should be being resolved if the NetworkInterface is disabled.
"""
network_interface: NetworkInterface
"""Save a reference to the node instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the NetworkInterface is enabled or not."""
return self.network_interface.enabled
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on NetworkInterface "
f"'{self.network_interface.mac_address}' because it is not enabled."
)
class _DisabledValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the NetworkInterface is disabled.
This is useful because some actions should be being resolved if the NetworkInterface is disabled.
"""
network_interface: NetworkInterface
"""Save a reference to the node instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the NetworkInterface is disabled or not."""
return not self.network_interface.enabled
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on NetworkInterface "
f"'{self.network_interface.mac_address}' because it is not disabled."
)
class WiredNetworkInterface(NetworkInterface, ABC):
"""
@@ -442,14 +501,17 @@ class WiredNetworkInterface(NetworkInterface, ABC):
:param frame: The network frame to be sent.
:return: True if the frame is sent, False if the Network Interface is disabled or not connected to a link.
"""
if not self.enabled:
return False
if not self._connected_link.can_transmit_frame(frame):
# Drop frame for now. Queuing will happen here (probably) if it's done in the future.
self._connected_node.sys_log.info(f"{self}: Frame dropped as Link is at capacity")
return False
super().send_frame(frame)
if self.enabled:
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
# Cannot send Frame as the NIC is not enabled
return False
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
@abstractmethod
def receive_frame(self, frame: Frame) -> bool:
@@ -680,12 +742,21 @@ class Link(SimComponent):
"""
return self.endpoint_a.enabled and self.endpoint_b.enabled
def _can_transmit(self, frame: Frame) -> bool:
def can_transmit_frame(self, frame: Frame) -> bool:
"""
Determines whether a frame can be transmitted considering the current Link load and the Link's bandwidth.
This method assesses if the transmission of a given frame is possible without exceeding the Link's total
bandwidth capacity. It checks if the current load of the Link plus the size of the frame (expressed in Mbps)
would remain within the defined bandwidth limits. The transmission is only feasible if the Link is active
('up') and the total load including the new frame does not surpass the bandwidth limit.
:param frame: The frame intended for transmission, which contains its size in Mbps.
:return: True if the frame can be transmitted without exceeding the bandwidth limit, False otherwise.
"""
if self.is_up:
frame_size_Mbits = frame.size_Mbits # noqa - Leaving it as Mbits as this is how they're expressed
# return self.current_load + frame_size_Mbits <= self.bandwidth
# TODO: re add this check once packet size limiting and MTU checks are implemented
return True
return self.current_load + frame.size_Mbits <= self.bandwidth
return False
def transmit_frame(self, sender_nic: WiredNetworkInterface, frame: Frame) -> bool:
@@ -696,11 +767,6 @@ class Link(SimComponent):
:param frame: The network frame to be sent.
:return: True if the Frame can be sent, otherwise False.
"""
can_transmit = self._can_transmit(frame)
if not can_transmit:
_LOGGER.debug(f"Cannot transmit frame as {self} is at capacity")
return False
receiver = self.endpoint_a
if receiver == sender_nic:
receiver = self.endpoint_b
@@ -889,6 +955,25 @@ class Node(SimComponent):
"""Message that is reported when a request is rejected by this validator."""
return f"Cannot perform request on node '{self.node.hostname}' because it is not powered on."
class _NodeIsOffValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the node is off.
This is useful because some actions require the node to be in an off state.
"""
node: Node
"""Save a reference to the node instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the node is on or off."""
return self.node.operating_state == NodeOperatingState.OFF
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return f"Cannot perform request on node '{self.node.hostname}' because it is not turned off."
def _init_request_manager(self) -> RequestManager:
"""
Initialise the request manager.
@@ -951,6 +1036,7 @@ class Node(SimComponent):
return RequestResponse.from_bool(False)
_node_is_on = Node._NodeIsOnValidator(node=self)
_node_is_off = Node._NodeIsOffValidator(node=self)
rm = super()._init_request_manager()
# since there are potentially many services, create an request manager that can map service name
@@ -980,7 +1066,12 @@ class Node(SimComponent):
func=lambda request, context: RequestResponse.from_bool(self.power_off()), validator=_node_is_on
),
)
rm.add_request("startup", RequestType(func=lambda request, context: RequestResponse.from_bool(self.power_on())))
rm.add_request(
"startup",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.power_on()), validator=_node_is_off
),
)
rm.add_request(
"reset",
RequestType(func=lambda request, context: RequestResponse.from_bool(self.reset()), validator=_node_is_on),

View File

@@ -58,12 +58,16 @@ class SwitchPort(WiredNetworkInterface):
:param frame: The network frame to be sent.
:return: A boolean indicating whether the frame was successfully sent.
"""
if self.enabled:
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
# Cannot send Frame as the SwitchPort is not enabled
return False
if not self.enabled:
return False
if not self._connected_link.can_transmit_frame(frame):
# Drop frame for now. Queuing will happen here (probably) if it's done in the future.
self._connected_node.sys_log.info(f"{self}: Frame dropped as Link is at capacity")
return False
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
def receive_frame(self, frame: Frame) -> bool:
"""

View File

@@ -1,6 +1,6 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
from typing import Any, Dict, Union
from typing import Any, Dict, Optional, Union
from pydantic import validate_call
@@ -153,7 +153,7 @@ class WirelessRouter(Router):
self,
ip_address: IPV4Address,
subnet_mask: IPV4Address,
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4,
frequency: Optional[AirSpaceFrequency] = AirSpaceFrequency.WIFI_2_4,
):
"""
Configures a wireless access point (WAP).
@@ -170,13 +170,20 @@ class WirelessRouter(Router):
enum. This determines the frequency band (e.g., 2.4 GHz or 5 GHz) the access point will use for wireless
communication. Default is AirSpaceFrequency.WIFI_2_4.
"""
if not frequency:
frequency = AirSpaceFrequency.WIFI_2_4
self.sys_log.info("Configuring wireless access point")
self.wireless_access_point.disable() # Temporarily disable the WAP for reconfiguration
network_interface = self.network_interface[1]
network_interface.ip_address = ip_address
network_interface.subnet_mask = subnet_mask
self.sys_log.info(f"Configured WAP {network_interface}")
self.wireless_access_point.frequency = frequency # Set operating frequency
self.wireless_access_point.enable() # Re-enable the WAP with new settings
self.sys_log.info(f"Configured WAP {network_interface}")
@property
def router_interface(self) -> RouterInterface:

View File

@@ -133,10 +133,11 @@ class Frame(BaseModel):
def size(self) -> float: # noqa - Keep it as MBits as this is how they're expressed
"""The size of the Frame in Bytes."""
# get the payload size if it is a data packet
payload_size = 0.0
if isinstance(self.payload, DataPacket):
return self.payload.get_packet_size()
payload_size = self.payload.get_packet_size()
return float(len(self.model_dump_json().encode("utf-8")))
return float(len(self.model_dump_json().encode("utf-8"))) + payload_size
@property
def size_Mbits(self) -> float: # noqa - Keep it as MBits as this is how they're expressed

View File

@@ -1,10 +1,12 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
from abc import abstractmethod
from enum import Enum
from typing import Any, ClassVar, Dict, Optional, Set, Type
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
@@ -64,9 +66,27 @@ class Application(IOSoftware):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
rm = super()._init_request_manager()
_is_application_running = Application._StateValidator(application=self, state=ApplicationOperatingState.RUNNING)
rm.add_request("close", RequestType(func=lambda request, context: RequestResponse.from_bool(self.close())))
rm = super()._init_request_manager()
rm.add_request(
"scan",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.scan()), validator=_is_application_running
),
)
rm.add_request(
"close",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.close()), validator=_is_application_running
),
)
rm.add_request(
"fix",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.fix()), validator=_is_application_running
),
)
return rm
@abstractmethod
@@ -169,3 +189,28 @@ class Application(IOSoftware):
:return: True if successful, False otherwise.
"""
return super().receive(payload=payload, session_id=session_id, **kwargs)
class _StateValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the application is in the correct state.
This is useful because most actions require the application to be in a specific state.
"""
application: Application
"""Save a reference to the application instance."""
state: ApplicationOperatingState
"""The state of the application to validate."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the application is in the state we are validating for."""
return self.application.operating_state == self.state
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on application '{self.application.name}' because it is not in the "
f"{self.state.name} state."
)

View File

@@ -1,11 +1,13 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, Optional
from primaite import getLogger
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
_LOGGER = getLogger(__name__)
@@ -40,6 +42,7 @@ class Service(IOSoftware):
restart_duration: int = 5
"How many timesteps does it take to restart this service."
restart_countdown: Optional[int] = None
"If currently restarting, how many timesteps remain until the restart is finished."
@@ -86,15 +89,61 @@ class Service(IOSoftware):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
_is_service_running = Service._StateValidator(service=self, state=ServiceOperatingState.RUNNING)
_is_service_stopped = Service._StateValidator(service=self, state=ServiceOperatingState.STOPPED)
_is_service_paused = Service._StateValidator(service=self, state=ServiceOperatingState.PAUSED)
_is_service_disabled = Service._StateValidator(service=self, state=ServiceOperatingState.DISABLED)
rm = super()._init_request_manager()
rm.add_request("scan", RequestType(func=lambda request, context: RequestResponse.from_bool(self.scan())))
rm.add_request("stop", RequestType(func=lambda request, context: RequestResponse.from_bool(self.stop())))
rm.add_request("start", RequestType(func=lambda request, context: RequestResponse.from_bool(self.start())))
rm.add_request("pause", RequestType(func=lambda request, context: RequestResponse.from_bool(self.pause())))
rm.add_request("resume", RequestType(func=lambda request, context: RequestResponse.from_bool(self.resume())))
rm.add_request("restart", RequestType(func=lambda request, context: RequestResponse.from_bool(self.restart())))
rm.add_request(
"scan",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.scan()), validator=_is_service_running
),
)
rm.add_request(
"stop",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.stop()), validator=_is_service_running
),
)
rm.add_request(
"start",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.start()), validator=_is_service_stopped
),
)
rm.add_request(
"pause",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.pause()), validator=_is_service_running
),
)
rm.add_request(
"resume",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.resume()), validator=_is_service_paused
),
)
rm.add_request(
"restart",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.restart()), validator=_is_service_running
),
)
rm.add_request("disable", RequestType(func=lambda request, context: RequestResponse.from_bool(self.disable())))
rm.add_request("enable", RequestType(func=lambda request, context: RequestResponse.from_bool(self.enable())))
rm.add_request(
"enable",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.enable()), validator=_is_service_disabled
),
)
rm.add_request(
"fix",
RequestType(
func=lambda request, context: RequestResponse.from_bool(self.fix()), validator=_is_service_running
),
)
return rm
@abstractmethod
@@ -191,3 +240,28 @@ class Service(IOSoftware):
self.sys_log.debug(f"Restarting finished for service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.restart_countdown -= 1
class _StateValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the service is in the correct state.
This is useful because most actions require the service to be in a specific state.
"""
service: Service
"""Save a reference to the service instance."""
state: ServiceOperatingState
"""The state of the service to validate."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the service is in the state we are validating for."""
return self.service.operating_state == self.state
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on service '{self.service.name}' because it is not in the "
f"{self.state.name} state."
)

View File

@@ -82,12 +82,31 @@ def config_callback(
show_default=False,
),
] = None,
agent_log_level: Annotated[
LogLevel,
typer.Option(
"--agent-log-level",
"-level",
click_type=click.Choice(LogLevel._member_names_, case_sensitive=False),
help="The level of agent behaviour logs to output.",
show_default=False,
),
] = None,
output_sys_logs: Annotated[
bool,
typer.Option(
"--output-sys-logs/--no-sys-logs", "-sys/-nsys", help="Output system logs to file.", show_default=False
),
] = None,
output_agent_logs: Annotated[
bool,
typer.Option(
"--output-agent-logs/--no-agent-logs",
"-agent/-nagent",
help="Output agent logs to file.",
show_default=False,
),
] = None,
output_pcap_logs: Annotated[
bool,
typer.Option(
@@ -109,10 +128,18 @@ def config_callback(
PRIMAITE_CONFIG["developer_mode"]["sys_log_level"] = ctx.params.get("sys_log_level")
print(f"PrimAITE dev-mode config updated sys_log_level={ctx.params.get('sys_log_level')}")
if ctx.params.get("agent_log_level") is not None:
PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] = ctx.params.get("agent_log_level")
print(f"PrimAITE dev-mode config updated agent_log_level={ctx.params.get('agent_log_level')}")
if output_sys_logs is not None:
PRIMAITE_CONFIG["developer_mode"]["output_sys_logs"] = output_sys_logs
print(f"PrimAITE dev-mode config updated {output_sys_logs=}")
if output_agent_logs is not None:
PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] = output_agent_logs
print(f"PrimAITE dev-mode config updated {output_agent_logs=}")
if output_pcap_logs is not None:
PRIMAITE_CONFIG["developer_mode"]["output_pcap_logs"] = output_pcap_logs
print(f"PrimAITE dev-mode config updated {output_pcap_logs=}")