Merge remote-tracking branch 'origin/dev' into feature/2623-action-masking

This commit is contained in:
Marek Wolan
2024-07-11 15:50:42 +01:00
57 changed files with 2860 additions and 263 deletions

View File

@@ -11,74 +11,78 @@ schedules:
branches:
include:
- 'refs/heads/dev'
pool:
vmImage: ubuntu-latest
variables:
VERSION: ''
MAJOR_VERSION: ''
steps:
- checkout: self
persistCredentials: true
jobs:
- job: PrimAITE_Benchmark
timeoutInMinutes: 360 # 6-hour maximum
pool:
vmImage: ubuntu-latest
workspace:
clean: all
steps:
- checkout: self
persistCredentials: true
- script: |
VERSION=$(cat src/primaite/VERSION | tr -d '\n')
if [[ "$(Build.SourceBranch)" == "refs/heads/dev" ]]; then
DATE=$(date +%Y%m%d)
echo "${VERSION}+dev.${DATE}" > src/primaite/VERSION
fi
displayName: 'Update VERSION file for Dev Benchmark'
- script: |
VERSION=$(cat src/primaite/VERSION | tr -d '\n')
if [[ "$(Build.SourceBranch)" == "refs/heads/dev" ]]; then
DATE=$(date +%Y%m%d)
echo "${VERSION}+dev.${DATE}" > src/primaite/VERSION
fi
displayName: 'Update VERSION file for Dev Benchmark'
- script: |
VERSION=$(cat src/primaite/VERSION | tr -d '\n')
MAJOR_VERSION=$(echo $VERSION | cut -d. -f1)
echo "##vso[task.setvariable variable=VERSION]$VERSION"
echo "##vso[task.setvariable variable=MAJOR_VERSION]$MAJOR_VERSION"
displayName: 'Set Version Variables'
- script: |
VERSION=$(cat src/primaite/VERSION | tr -d '\n')
MAJOR_VERSION=$(echo $VERSION | cut -d. -f1)
echo "##vso[task.setvariable variable=VERSION]$VERSION"
echo "##vso[task.setvariable variable=MAJOR_VERSION]$MAJOR_VERSION"
displayName: 'Set Version Variables'
- task: UsePythonVersion@0
inputs:
versionSpec: '3.11'
addToPath: true
- task: UsePythonVersion@0
inputs:
versionSpec: '3.11'
addToPath: true
- script: |
python -m pip install --upgrade pip
pip install -e .[dev,rl]
primaite setup
displayName: 'Install Dependencies'
- script: |
python -m pip install --upgrade pip
pip install -e .[dev,rl]
primaite setup
displayName: 'Install Dependencies'
- script: |
cd benchmark
python3 primaite_benchmark.py
cd ..
displayName: 'Run Benchmarking Script'
- script: |
set -e
cd benchmark
python3 primaite_benchmark.py
cd ..
displayName: 'Run Benchmarking Script'
- script: |
git config --global user.email "oss@dstl.gov.uk"
git config --global user.name "Defence Science and Technology Laboratory UK"
workingDirectory: $(System.DefaultWorkingDirectory)
displayName: 'Configure Git'
condition: and(succeeded(), eq(variables['Build.Reason'], 'Manual'), startsWith(variables['Build.SourceBranch'], 'refs/heads/release'))
- script: |
git config --global user.email "oss@dstl.gov.uk"
git config --global user.name "Defence Science and Technology Laboratory UK"
workingDirectory: $(System.DefaultWorkingDirectory)
displayName: 'Configure Git'
condition: and(succeeded(), eq(variables['Build.Reason'], 'Manual'), startsWith(variables['Build.SourceBranch'], 'refs/heads/release'))
- script: |
git add benchmark/results/v$(MAJOR_VERSION)/v$(VERSION)/*
git commit -m "Automated benchmark output commit for version $(VERSION)"
git push origin HEAD:refs/heads/$(Build.SourceBranchName)
displayName: 'Commit and Push Benchmark Results'
workingDirectory: $(System.DefaultWorkingDirectory)
env:
GIT_CREDENTIALS: $(System.AccessToken)
condition: and(succeeded(), startsWith(variables['Build.SourceBranch'], 'refs/heads/release'))
- script: |
git add benchmark/results/v$(MAJOR_VERSION)/v$(VERSION)/*
git commit -m "Automated benchmark output commit for version $(VERSION)"
git push origin HEAD:refs/heads/$(Build.SourceBranchName)
displayName: 'Commit and Push Benchmark Results'
workingDirectory: $(System.DefaultWorkingDirectory)
env:
GIT_CREDENTIALS: $(System.AccessToken)
condition: and(succeeded(), startsWith(variables['Build.SourceBranch'], 'refs/heads/release'))
- script: |
tar czf primaite_v$(VERSION)_benchmark.tar.gz benchmark/results/v$(MAJOR_VERSION)/v$(VERSION)
displayName: 'Prepare Artifacts for Publishing'
- script: |
tar czf primaite_v$(VERSION)_benchmark.tar.gz benchmark/results/v$(MAJOR_VERSION)/v$(VERSION)
displayName: 'Prepare Artifacts for Publishing'
- task: PublishPipelineArtifact@1
inputs:
targetPath: primaite_v$(VERSION)_benchmark.tar.gz
artifactName: 'benchmark-output'
publishLocation: 'pipeline'
displayName: 'Publish Benchmark Output as Artifact'
- task: PublishPipelineArtifact@1
inputs:
targetPath: primaite_v$(VERSION)_benchmark.tar.gz
artifactName: 'benchmark-output'
publishLocation: 'pipeline'
displayName: 'Publish Benchmark Output as Artifact'

View File

@@ -2,9 +2,43 @@
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- **AirSpaceEnvironmentType Enum Class**: Introduced in `airspace.py` to define different environmental settings affecting wireless network behavior.
- **ChannelWidth Enum Class**: Added in `airspace.py` to specify channel width options for wireless network interfaces.
- **Channel Width Attribute**: Incorporated into the `WirelessNetworkInterface` class to allow dynamic setting based on `AirSpaceFrequency` and `AirSpaceEnvironmentType`.
- **SNR and Capacity Calculation Functions**: Functions `estimate_snr` and `calculate_total_channel_capacity` added to `airspace.py` for computing signal-to-noise ratio and capacity based on frequency and channel width.
- **Dynamic Speed Setting**: WirelessInterface speed attribute now dynamically adjusts based on the operational environment, frequency, and channel width.
- **airspace_key Attribute**: Added to `WirelessNetworkInterface` as a tuple of frequency and channel width, serving as a key for bandwidth/channel management.
- **airspace_environment_type Attribute**: Determines the environmental type for the airspace, influencing data rate calculations and capacity sharing.
- **show_bandwidth_load Function**: Displays current bandwidth load for each frequency and channel width in the airspace.
- **Configuration Schema Update**: The `simulation.network` config file now includes settings for the `airspace_environment_type`.
- **Bandwidth Tracking**: Tracks data transmission across each frequency/channel width pairing.
- **Configuration Support for Wireless Routers**: `channel_width` can now be configured in the config file under `wireless_access_point`.
- **New Tests**: Added to validate the respect of bandwidth capacities and the correct parsing of airspace configurations from YAML files.
- **New Logging**: Added a new agent behaviour log which are more human friendly than agent history. These Logs are found in session log directory and can be enabled in the I/O settings in a yaml configuration file.
### Changed
- **NetworkInterface Speed Type**: The `speed` attribute of `NetworkInterface` has been changed from `int` to `float`.
- **Transmission Feasibility Check**: Updated `_can_transmit` function in `Link` to account for current load and total bandwidth capacity, ensuring transmissions do not exceed limits.
- **Frame Size Details**: Frame `size` attribute now includes both core size and payload size in bytes.
- **WirelessRouter Configuration Function**: `configure_wireless_access_point` function now accepts `channel_width` as a parameter.
- **Interface Grouping**: `WirelessNetworkInterfaces` are now grouped by both `AirSpaceFrequency` and `ChannelWidth`.
- **Interface Frequency/Channel Width Adjustment**: Changing an interface's settings now involves removal from the airspace, recalculation of its data rate, and re-addition under new settings.
- **Transmission Blocking**: Enhanced `AirSpace` logic to block transmissions that would exceed the available capacity.
### Fixed
- **Transmission Permission Logic**: Corrected the logic in `can_transmit_frame` to accurately prevent overloads by checking if the transmission of a frame stays within allowable bandwidth limits after considering current load.
[//]: # (This file needs tidying up between 2.0.0 and this line as it hasn't been segmented into 3.0.0 and 3.1.0 and isn't compliant with https://keepachangelog.com/en/1.1.0/)
## 3.0.0b9
- Removed deprecated `PrimaiteSession` class.
- Added ability to set log levels via configuration.
@@ -26,8 +60,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Refactored all air-space usage to that a new instance of AirSpace is created for each instance of Network. This 1:1 relationship between network and airspace will allow parallelization.
- Added notebook to demonstrate use of SubprocVecEnv from SB3 to vectorise environments to speed up training.
## [Unreleased]
- Made requests fail to reach their target if the node is off
- Added responses to requests

View File

@@ -26,7 +26,7 @@
"av_s_per_session": 3205.6340542,
"av_s_per_step": 0.10017606419375,
"av_s_per_100_steps_10_nodes": 10.017606419375,
"combined_av_reward_per_episode": {
"combined_total_reward_per_episode": {
"1": -53.42999999999999,
"2": -25.18000000000001,
"3": -42.00000000000002,

View File

@@ -18,8 +18,11 @@ This section configures how PrimAITE saves data during simulation and training.
save_step_metadata: False
save_pcap_logs: False
save_sys_logs: False
save_agent_logs: False
write_sys_log_to_terminal: False
write_agent_log_to_terminal: False
sys_log_level: WARNING
agent_log_level: INFO
``save_logs``
@@ -57,6 +60,12 @@ Optional. Default value is ``False``.
If ``True``, then the log files which contain all node actions during the simulation will be saved.
``save_agent_logs``
-----------------
Optional. Default value is ``False``.
If ``True``, then the log files which contain all human readable agent behaviour during the simulation will be saved.
``write_sys_log_to_terminal``
-----------------------------
@@ -65,16 +74,25 @@ Optional. Default value is ``False``.
If ``True``, PrimAITE will print sys log to the terminal.
``write_agent_log_to_terminal``
-----------------------------
``sys_log_level``
-------------
Optional. Default value is ``False``.
If ``True``, PrimAITE will print all human readable agent behaviour logs to the terminal.
``sys_log_level & agent_log_level``
---------------------------------
Optional. Default value is ``WARNING``.
The level of logging that should be visible in the sys logs or the logs output to the terminal.
The level of logging that should be visible in the syslog, agent logs or the logs output to the terminal.
``save_sys_logs`` or ``write_sys_log_to_terminal`` has to be set to ``True`` for this setting to be used.
This is also true for agent behaviour logging.
Available options are:
- ``DEBUG``: Debug level items and the items below

View File

@@ -7,7 +7,7 @@
==============
In this section the network layout is defined. This part of the config follows a hierarchical structure. Almost every component defines a ``ref`` field which acts as a human-readable unique identifier, used by other parts of the config, such as agents.
At the top level of the network are ``nodes`` and ``links``.
At the top level of the network are ``nodes``, ``links`` and ``airspace``.
e.g.
@@ -19,6 +19,9 @@ e.g.
...
links:
...
airspace:
...
``nodes``
---------
@@ -101,3 +104,35 @@ This accepts an integer value e.g. if port 1 is to be connected, the configurati
``bandwidth``
This is an integer value specifying the allowed bandwidth across the connection. Units are in Mbps.
``airspace``
------------
This section configures settings specific to the wireless network's virtual airspace. It defines how wireless interfaces within the simulation will interact and perform under various environmental conditions.
``airspace_environment_type``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This setting specifies the environmental conditions of the airspace which affect the propagation and interference characteristics of wireless signals. Changing this environment type impacts how signal noise and interference are calculated, thus affecting the overall network performance, including data transmission rates and signal quality.
**Configurable Options**
- **rural**: A rural environment offers clear channel conditions due to low population density and minimal electronic device presence.
- **outdoor**: Outdoor environments like parks or fields have minimal electronic interference.
- **suburban**: Suburban environments strike a balance with fewer electronic interferences than urban but more than rural.
- **office**: Office environments have moderate interference from numerous electronic devices and overlapping networks.
- **urban**: Urban environments are characterized by tall buildings and a high density of electronic devices, leading to significant interference.
- **industrial**: Industrial areas face high interference from heavy machinery and numerous electronic devices.
- **transport**: Environments such as subways and buses where metal structures and high mobility create complex interference patterns.
- **dense_urban**: Dense urban areas like city centers have the highest level of signal interference due to the very high density of buildings and devices.
- **jamming_zone**: A jamming zone environment where signals are actively interfered with, typically through the use of signal jammers or scrambling devices. This represents the environment with the highest level of interference.
- **blocked**: A jamming zone environment with total levels of interference. Airspace is completely blocked.

View File

@@ -27,6 +27,7 @@ Contents
simulation_components/network/nodes/firewall
simulation_components/network/switch
simulation_components/network/network
simulation_components/network/airspace
simulation_components/system/internal_frame_processing
simulation_components/system/sys_log
simulation_components/system/pcap

View File

@@ -0,0 +1,100 @@
.. only:: comment
© Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
.. _airspace:
AirSpace
========
1. Introduction
---------------
The AirSpace class is the central component for wireless networks in PrimAITE and is designed to model and manage the behavior and interactions of wireless network interfaces within a simulated wireless network environment. This documentation provides a detailed overview of the AirSpace class, its components, and how they interact to create a realistic simulation of wireless network dynamics.
2. Overview of the AirSpace System
----------------------------------
The AirSpace is a virtual representation of a physical wireless environment, managing multiple wireless network interfaces that simulate devices connected to the wireless network. These interfaces communicate over radio frequencies, with their interactions influenced by various factors modeled within the AirSpace.
2.1 Key Components
^^^^^^^^^^^^^^^^^^
- **Wireless Network Interfaces**: Representations of network interfaces connected physical devices like routers, computers, or IoT devices that can send and receive data wirelessly.
- **Environmental Settings**: Different types of environments (e.g., urban, rural) that affect signal propagation and interference.
- **Channel Management**: Handles channels and their widths (e.g., 20 MHz, 40 MHz) to determine data transmission over different frequencies.
- **Bandwidth Management**: Tracks data transmission over channels to prevent overloading and simulate real-world network congestion.
3. AirSpace Environment Types
-----------------------------
The AirspaceEnvironmentType is a critical component that simulates different physical environments:
- Urban, Suburban, Rural, etc.
- Each type simulates different levels of electromagnetic interference and signal propagation characteristics.
- Changing the AirspaceEnvironmentType impacts data rates by affecting the signal-to-noise ratio (SNR).
4. Simulation of Environment Changes
------------------------------------
When an AirspaceEnvironmentType is set or changed, the AirSpace:
1. Recalculates the maximum data transmission capacities for all managed frequencies and channel widths.
2. Updates all wireless interfaces to reflect new capacities.
5. Managing Wireless Network Interfaces
---------------------------------------
- Interfaces can be dynamically added or removed.
- Configurations can be changed in real-time.
- The AirSpace handles data transmissions, ensuring data sent by an interface is received by all other interfaces on the same frequency and channel.
6. Signal-to-Noise Ratio (SNR) Calculation
------------------------------------------
SNR is crucial in determining the quality of a wireless communication channel:
.. math::
SNR = \frac{\text{Signal Power}}{\text{Noise Power}}
- Impacted by environment type, frequency, and channel width
- Higher SNR indicates a clearer signal, leading to higher data transmission rates
7. Total Channel Capacity Calculation
-------------------------------------
Channel capacity is calculated using the Shannon-Hartley theorem:
.. math::
C = B \cdot \log_2(1 + SNR)
Where:
- C: channel capacity in bits per second (bps)
- B: bandwidth of the channel in hertz (Hz)
- SNR: signal-to-noise ratio
Implementation in AirSpace:
1. Convert channel width from MHz to Hz.
2. Recalculate SNR based on new environment or interface settings.
3. Apply Shannon-Hartley theorem to determine new maximum channel capacity in Mbps.
8. Shared Maximum Capacity Across Devices
-----------------------------------------
While individual devices have theoretical maximum data rates, the actual achievable rate is often less due to:
- Shared wireless medium among all devices on the same frequency and channel width
- Interference and congestion from multiple devices transmitting simultaneously
9. AirSpace Inspection
----------------------
The AirSpace class provides methods for visualizing network behavior:
- ``show_wireless_interfaces()``: Displays current state of all interfaces
- ``show_bandwidth_load()``: Shows channel loads and bandwidth utilization

View File

@@ -37,7 +37,7 @@ additional steps to configure wireless settings:
.. code-block:: python
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.airspace import AirSpaceFrequency
from primaite.simulator.network.airspace import AirSpaceFrequency, ChannelWidth
# Instantiate the WirelessRouter
wireless_router = WirelessRouter(hostname="MyWirelessRouter")
@@ -49,7 +49,8 @@ additional steps to configure wireless settings:
wireless_router.configure_wireless_access_point(
port=1, ip_address="192.168.2.1",
subnet_mask="255.255.255.0",
frequency=AirSpaceFrequency.WIFI_2_4
frequency=AirSpaceFrequency.WIFI_2_4,
channel_width=ChannelWidth.ChannelWidth.WIDTH_40_MHZ
)
@@ -71,7 +72,7 @@ ICMP traffic, ensuring basic network connectivity and ping functionality.
.. code-block:: python
from primaite.simulator.network.airspace import AIR_SPACE, AirSpaceFrequency
from primaite.simulator.network.airspace import AirSpaceFrequency, ChannelWidth
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
@@ -130,13 +131,15 @@ ICMP traffic, ensuring basic network connectivity and ping functionality.
port=1,
ip_address="192.168.1.1",
subnet_mask="255.255.255.0",
frequency=AirSpaceFrequency.WIFI_2_4
frequency=AirSpaceFrequency.WIFI_2_4,
channel_width=ChannelWidth.ChannelWidth.WIDTH_40_MHZ
)
router_2.configure_wireless_access_point(
port=1,
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
frequency=AirSpaceFrequency.WIFI_2_4
frequency=AirSpaceFrequency.WIFI_2_4,
channel_width=ChannelWidth.ChannelWidth.WIDTH_40_MHZ
)
# Configure routes for inter-router communication

View File

@@ -0,0 +1,188 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import logging
from pathlib import Path
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator import LogLevel, SIM_OUTPUT
class _NotJSONFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
"""
Determines if a log message does not start and end with '{' and '}' (i.e., it is not a JSON-like message).
:param record: LogRecord object containing all the information pertinent to the event being logged.
:return: True if log message is not JSON-like, False otherwise.
"""
return not record.getMessage().startswith("{") and not record.getMessage().endswith("}")
class AgentLog:
"""
A Agent Log class is a simple logger dedicated to managing and writing logging updates and information for an agent.
Each log message is written to a file located at: <simulation output directory>/agent_name/agent_name.log
"""
def __init__(self, agent_name: str):
"""
Constructs a Agent Log instance for a given hostname.
:param hostname: The hostname associated with the system logs being recorded.
"""
self.agent_name = agent_name
self.current_episode: int = 1
self.current_timestep: int = 0
self.setup_logger()
@property
def timestep(self) -> int:
"""Returns the current timestep. Used for log indexing.
:return: The current timestep as an Int.
"""
return self.current_timestep
def update_timestep(self, new_timestep: int):
"""
Updates the self.current_timestep attribute with the given parameter.
This method is called within .step() to ensure that all instances of Agent Logs
are in sync with one another.
:param new_timestep: The new timestep.
"""
self.current_timestep = new_timestep
def setup_logger(self):
"""
Configures the logger for this Agent Log instance.
The logger is set to the DEBUG level, and is equipped with a handler that writes to a file and filters out
JSON-like messages.
"""
if not SIM_OUTPUT.save_agent_logs:
return
log_path = self._get_log_path()
file_handler = logging.FileHandler(filename=log_path)
file_handler.setLevel(logging.DEBUG)
log_format = "%(timestep)s::%(levelname)s::%(message)s"
file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"{self.agent_name}_log")
for handler in self.logger.handlers:
self.logger.removeHandler(handler)
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
def _get_log_path(self) -> Path:
"""
Constructs the path for the log file based on the agent name.
:return: Path object representing the location of the log file.
"""
root = SIM_OUTPUT.agent_behaviour_path / f"episode_{self.current_episode}" / self.agent_name
root.mkdir(exist_ok=True, parents=True)
return root / f"{self.agent_name}.log"
def _write_to_terminal(self, msg: str, level: str, to_terminal: bool = False):
if to_terminal or SIM_OUTPUT.write_agent_log_to_terminal:
print(f"{self.agent_name}: ({ self.timestep}) ({level}) {msg}")
def debug(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the DEBUG level.
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.DEBUG:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.debug(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "DEBUG", to_terminal)
def info(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the INFO level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.INFO:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.info(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "INFO", to_terminal)
def warning(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the WARNING level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.WARNING:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.warning(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "WARNING", to_terminal)
def error(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the ERROR level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.agent_log_level > LogLevel.ERROR:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.error(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "ERROR", to_terminal)
def critical(self, msg: str, to_terminal: bool = False):
"""
Logs a message with the CRITICAL level.
:param msg: The message to be logged.
:param timestep: The current timestep.
:param to_terminal: If True, prints to the terminal too.
"""
if LogLevel.CRITICAL < SIM_OUTPUT.agent_log_level:
return
if SIM_OUTPUT.save_agent_logs:
self.logger.critical(msg, extra={"timestep": self.timestep})
self._write_to_terminal(msg, "CRITICAL", to_terminal)
def show(self, last_n: int = 10, markdown: bool = False):
"""
Print an Agents Log as a table.
Generate and print PrettyTable instance that shows the agents behaviour log, with columns Time step,
Level and Message.
:param markdown: Use Markdown style in table output. Defaults to False.
"""
table = PrettyTable(["Time Step", "Level", "Message"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.agent_name} Behaviour Log"
if self._get_log_path().exists():
with open(self._get_log_path()) as file:
lines = file.readlines()
for line in lines[-last_n:]:
table.add_row(line.strip().split("::"))
print(table)

View File

@@ -7,6 +7,7 @@ from gymnasium.core import ActType, ObsType
from pydantic import BaseModel, model_validator
from primaite.game.agent.actions import ActionManager
from primaite.game.agent.agent_log import AgentLog
from primaite.game.agent.observations.observation_manager import ObservationManager
from primaite.game.agent.rewards import RewardFunction
from primaite.interface.request import RequestFormat, RequestResponse
@@ -118,6 +119,7 @@ class AbstractAgent(ABC):
self.reward_function: Optional[RewardFunction] = reward_function
self.agent_settings = agent_settings or AgentSettings()
self.history: List[AgentHistoryItem] = []
self.logger = AgentLog(agent_name)
def update_observation(self, state: Dict) -> ObsType:
"""

View File

@@ -38,10 +38,11 @@ class DataManipulationAgent(AbstractScriptedAgent):
:rtype: Tuple[str, Dict]
"""
if timestep < self.next_execution_timestep:
self.logger.debug(msg="Performing do NOTHING")
return "DONOTHING", {}
self._set_next_execution_timestep(timestep + self.agent_settings.start_settings.frequency)
self.logger.info(msg="Performing a data manipulation attack!")
return "NODE_APPLICATION_EXECUTE", {"node_id": self.starting_node_idx, "application_id": 0}
def setup_agent(self) -> None:
@@ -54,3 +55,4 @@ class DataManipulationAgent(AbstractScriptedAgent):
# we are assuming that every node in the node manager has a data manipulation application at idx 0
num_nodes = len(self.action_manager.node_names)
self.starting_node_idx = random.randint(0, num_nodes - 1)
self.logger.debug(msg=f"Select Start Node ID: {self.starting_node_idx}")

View File

@@ -85,4 +85,5 @@ class ProbabilisticAgent(AbstractScriptedAgent):
:rtype: Tuple[str, Dict]
"""
choice = self.rng.choice(len(self.action_manager.action_map), p=self.probabilities)
self.logger.info(f"Performing Action: {choice}")
return self.action_manager.get_action(choice)

View File

@@ -16,6 +16,8 @@ from primaite.game.agent.scripted_agents.probabilistic_agent import Probabilisti
from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent
from primaite.game.agent.scripted_agents.tap001 import TAP001
from primaite.game.science import graph_has_cycle, topological_sort
from primaite.simulator import SIM_OUTPUT
from primaite.simulator.network.airspace import AirspaceEnvironmentType
from primaite.simulator.network.hardware.base import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
@@ -165,6 +167,8 @@ class PrimaiteGame:
for _, agent in self.agents.items():
obs = agent.observation_manager.current_observation
action_choice, parameters = agent.get_action(obs, timestep=self.step_counter)
if SIM_OUTPUT.save_agent_logs:
agent.logger.debug(f"Chosen Action: {action_choice}")
request = agent.format_request(action_choice, parameters)
response = self.simulation.apply_request(request)
agent.process_action_response(
@@ -183,8 +187,14 @@ class PrimaiteGame:
"""Advance timestep."""
self.step_counter += 1
_LOGGER.debug(f"Advancing timestep to {self.step_counter} ")
self.update_agent_loggers()
self.simulation.apply_timestep(self.step_counter)
def update_agent_loggers(self) -> None:
"""Updates Agent Loggers with new timestep."""
for agent in self.agents.values():
agent.logger.update_timestep(self.step_counter)
def calculate_truncated(self) -> bool:
"""Calculate whether the episode is truncated."""
current_step = self.step_counter
@@ -245,6 +255,11 @@ class PrimaiteGame:
simulation_config = cfg.get("simulation", {})
network_config = simulation_config.get("network", {})
airspace_cfg = network_config.get("airspace", {})
airspace_environment_type_str = airspace_cfg.get("airspace_environment_type", "urban")
airspace_environment_type: AirspaceEnvironmentType = AirspaceEnvironmentType(airspace_environment_type_str)
net.airspace.airspace_environment_type = airspace_environment_type
nodes_cfg = network_config.get("nodes", [])
links_cfg = network_config.get("links", [])

View File

@@ -35,10 +35,16 @@ class PrimaiteIO:
"""Whether to save PCAP logs."""
save_sys_logs: bool = True
"""Whether to save system logs."""
save_agent_logs: bool = True
"""Whether to save agent logs."""
write_sys_log_to_terminal: bool = False
"""Whether to write the sys log to the terminal."""
write_agent_log_to_terminal: bool = False
"""Whether to write the agent log to the terminal."""
sys_log_level: LogLevel = LogLevel.INFO
"""The level of log that should be included in the logfiles/logged into terminal."""
"""The level of sys logs that should be included in the logfiles/logged into terminal."""
agent_log_level: LogLevel = LogLevel.INFO
"""The level of agent logs that should be included in the logfiles/logged into terminal."""
def __init__(self, settings: Optional[Settings] = None) -> None:
"""
@@ -51,27 +57,31 @@ class PrimaiteIO:
self.session_path: Path = self.generate_session_path()
# set global SIM_OUTPUT path
SIM_OUTPUT.path = self.session_path / "simulation_output"
SIM_OUTPUT.agent_behaviour_path = self.session_path / "agent_behaviour"
SIM_OUTPUT.save_pcap_logs = self.settings.save_pcap_logs
SIM_OUTPUT.save_sys_logs = self.settings.save_sys_logs
SIM_OUTPUT.save_agent_logs = self.settings.save_agent_logs
SIM_OUTPUT.write_agent_log_to_terminal = self.settings.write_agent_log_to_terminal
SIM_OUTPUT.write_sys_log_to_terminal = self.settings.write_sys_log_to_terminal
SIM_OUTPUT.sys_log_level = self.settings.sys_log_level
SIM_OUTPUT.agent_log_level = self.settings.agent_log_level
def generate_session_path(self, timestamp: Optional[datetime] = None) -> Path:
"""Create a folder for the session and return the path to it."""
if timestamp is None:
timestamp = datetime.now()
date_str = timestamp.strftime("%Y-%m-%d")
time_str = timestamp.strftime("%H-%M-%S")
session_path = PRIMAITE_PATHS.user_sessions_path / date_str / time_str
session_path = PRIMAITE_PATHS.user_sessions_path / SIM_OUTPUT.date_str / SIM_OUTPUT.time_str
# check if running in dev mode
if is_dev_mode():
session_path = _PRIMAITE_ROOT.parent.parent / "sessions" / date_str / time_str
session_path = _PRIMAITE_ROOT.parent.parent / "sessions" / SIM_OUTPUT.date_str / SIM_OUTPUT.time_str
# check if there is an output directory set in config
if PRIMAITE_CONFIG["developer_mode"]["output_dir"]:
session_path = Path(PRIMAITE_CONFIG["developer_mode"]["output_dir"]) / "sessions" / date_str / time_str
session_path = (
Path(PRIMAITE_CONFIG["developer_mode"]["output_dir"])
/ "sessions"
/ SIM_OUTPUT.date_str
/ SIM_OUTPUT.time_str
)
session_path.mkdir(exist_ok=True, parents=True)
return session_path
@@ -115,6 +125,9 @@ class PrimaiteIO:
if config.get("sys_log_level"):
config["sys_log_level"] = LogLevel[config["sys_log_level"].upper()] # convert to enum
if config.get("agent_log_level"):
config["agent_log_level"] = LogLevel[config["agent_log_level"].upper()] # convert to enum
new = cls(settings=cls.Settings(**config))
return new

View File

@@ -3,6 +3,8 @@
developer_mode:
enabled: False # not enabled by default
sys_log_level: DEBUG # level of output for system logs, DEBUG by default
agent_log_level: DEBUG # level of output for agent logs, DEBUG by default
output_agent_logs: False # level of output for system logs, DEBUG by default
output_sys_logs: False # system logs not output by default
output_pcap_logs: False # pcap logs not output by default
output_to_terminal: False # do not output to terminal by default

View File

@@ -34,10 +34,14 @@ class _SimOutput:
path = PRIMAITE_PATHS.user_sessions_path / self.date_str / self.time_str
self._path = path
self._agent_behaviour_path = path
self._save_pcap_logs: bool = False
self._save_sys_logs: bool = False
self._save_agent_logs: bool = False
self._write_sys_log_to_terminal: bool = False
self._write_agent_log_to_terminal: bool = False
self._sys_log_level: LogLevel = LogLevel.WARNING # default log level is at WARNING
self._agent_log_level: LogLevel = LogLevel.WARNING
@property
def path(self) -> Path:
@@ -61,6 +65,28 @@ class _SimOutput:
self._path = new_path
self._path.mkdir(exist_ok=True, parents=True)
@property
def agent_behaviour_path(self) -> Path:
if is_dev_mode():
# if dev mode is enabled, if output dir is not set, print to primaite repo root
path: Path = _PRIMAITE_ROOT.parent.parent / "sessions" / self.date_str / self.time_str / "agent_behaviour"
# otherwise print to output dir
if PRIMAITE_CONFIG["developer_mode"]["output_dir"]:
path: Path = (
Path(PRIMAITE_CONFIG["developer_mode"]["output_dir"])
/ "sessions"
/ self.date_str
/ self.time_str
/ "agent_behaviour"
)
self._agent_behaviour_path = path
return self._agent_behaviour_path
@agent_behaviour_path.setter
def agent_behaviour_path(self, new_path: Path) -> None:
self._agent_behaviour_path = new_path
self._agent_behaviour_path.mkdir(exist_ok=True, parents=True)
@property
def save_pcap_logs(self) -> bool:
if is_dev_mode():
@@ -81,6 +107,16 @@ class _SimOutput:
def save_sys_logs(self, save_sys_logs: bool) -> None:
self._save_sys_logs = save_sys_logs
@property
def save_agent_logs(self) -> bool:
if is_dev_mode():
return PRIMAITE_CONFIG.get("developer_mode").get("output_agent_logs")
return self._save_agent_logs
@save_agent_logs.setter
def save_agent_logs(self, save_agent_logs: bool) -> None:
self._save_agent_logs = save_agent_logs
@property
def write_sys_log_to_terminal(self) -> bool:
if is_dev_mode():
@@ -91,6 +127,17 @@ class _SimOutput:
def write_sys_log_to_terminal(self, write_sys_log_to_terminal: bool) -> None:
self._write_sys_log_to_terminal = write_sys_log_to_terminal
# Should this be separate from sys_log?
@property
def write_agent_log_to_terminal(self) -> bool:
if is_dev_mode():
return PRIMAITE_CONFIG.get("developer_mode").get("output_to_terminal")
return self._write_agent_log_to_terminal
@write_agent_log_to_terminal.setter
def write_agent_log_to_terminal(self, write_agent_log_to_terminal: bool) -> None:
self._write_agent_log_to_terminal = write_agent_log_to_terminal
@property
def sys_log_level(self) -> LogLevel:
if is_dev_mode():
@@ -101,5 +148,15 @@ class _SimOutput:
def sys_log_level(self, sys_log_level: LogLevel) -> None:
self._sys_log_level = sys_log_level
@property
def agent_log_level(self) -> LogLevel:
if is_dev_mode():
return LogLevel[PRIMAITE_CONFIG.get("developer_mode").get("agent_log_level")]
return self._agent_log_level
@agent_log_level.setter
def agent_log_level(self, agent_log_level: LogLevel) -> None:
self._agent_log_level = agent_log_level
SIM_OUTPUT = _SimOutput()

View File

@@ -35,6 +35,20 @@ class RequestPermissionValidator(BaseModel):
"""Message that is reported when a request is rejected by this validator."""
return "request rejected"
def __add__(self, other: "RequestPermissionValidator") -> "_CombinedValidator":
return _CombinedValidator(validators=[self, other])
class _CombinedValidator(RequestPermissionValidator):
validators: List[RequestPermissionValidator] = []
def __call__(self, request, context) -> bool:
return all(x(request, context) for x in self.validators)
@property
def fail_message(self):
return f"One of the following conditions are not met: {[v.fail_message for v in self.validators]}"
class AllowAllValidator(RequestPermissionValidator):
"""Always allows the request."""

View File

@@ -6,8 +6,8 @@ from typing import Any, Dict, List, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType, SimComponent
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.file_system.folder import Folder
@@ -42,6 +42,10 @@ class FileSystem(SimComponent):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
self._folder_exists = FileSystem._FolderExistsValidator(file_system=self)
self._folder_not_deleted = FileSystem._FolderNotDeletedValidator(file_system=self)
self._file_exists = FileSystem._FileExistsValidator(file_system=self)
rm = super()._init_request_manager()
self._delete_manager = RequestManager()
@@ -50,13 +54,15 @@ class FileSystem(SimComponent):
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.delete_file(folder_name=request[0], file_name=request[1])
)
),
validator=self._file_exists,
),
)
self._delete_manager.add_request(
name="folder",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0]))
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0])),
validator=self._folder_exists,
),
)
rm.add_request(
@@ -144,10 +150,13 @@ class FileSystem(SimComponent):
)
self._folder_request_manager = RequestManager()
rm.add_request("folder", RequestType(func=self._folder_request_manager))
rm.add_request(
"folder",
RequestType(func=self._folder_request_manager, validator=self._folder_exists + self._folder_not_deleted),
)
self._file_request_manager = RequestManager()
rm.add_request("file", RequestType(func=self._file_request_manager))
rm.add_request("file", RequestType(func=self._file_request_manager, validator=self._file_exists))
return rm
@@ -626,3 +635,62 @@ class FileSystem(SimComponent):
self.sys_log.error(f"Unable to access file that does not exist. (file name: {file_name})")
return False
class _FolderExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the Folder exists.
Actions cannot be performed on a non-existent folder.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if folder exists."""
return self.file_system.get_folder(folder_name=request[0]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on folder because it does not exist."
class _FolderNotDeletedValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the Folder has not been deleted.
Actions cannot be performed on a deleted folder.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if folder exists and is not deleted."""
# get folder
folder = self.file_system.get_folder(folder_name=request[0], include_deleted=True)
return folder is not None and not folder.deleted
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on folder because it is deleted."
class _FileExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File exists.
Actions cannot be performed on a non-existent file.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists."""
return self.file_system.get_file(folder_name=request[0], file_name=request[1]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on a file that does not exist."

View File

@@ -185,5 +185,5 @@ file_type_sizes_bytes = {
FileType.ZIP: 1024000,
FileType.TAR: 1024000,
FileType.GZ: 819200,
FileType.DB: 15360000,
FileType.DB: 5_000_000,
}

View File

@@ -6,8 +6,8 @@ from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
@@ -55,6 +55,9 @@ class Folder(FileSystemItemABC):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
self._file_exists = Folder._FileExistsValidator(folder=self)
self._file_not_deleted = Folder._FileNotDeletedValidator(folder=self)
rm = super()._init_request_manager()
rm.add_request(
name="delete",
@@ -65,7 +68,9 @@ class Folder(FileSystemItemABC):
self._file_request_manager = RequestManager()
rm.add_request(
name="file",
request_type=RequestType(func=self._file_request_manager),
request_type=RequestType(
func=self._file_request_manager, validator=self._file_exists + self._file_not_deleted
),
)
return rm
@@ -469,3 +474,42 @@ class Folder(FileSystemItemABC):
self.deleted = True
return True
class _FileExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File exists.
Actions cannot be performed on a non-existent file.
"""
folder: Folder
"""Save a reference to the Folder instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists."""
return self.folder.get_file(file_name=request[0]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on a file that does not exist."
class _FileNotDeletedValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File is not deleted.
Actions cannot be performed on a deleted file.
"""
folder: Folder
"""Save a reference to the Folder instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists and is not deleted."""
file = self.folder.get_file(file_name=request[0])
return file is not None and not file.deleted
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on a file that is deleted."

View File

@@ -3,9 +3,11 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Tuple
from prettytable import PrettyTable
import numpy as np
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, computed_field, Field, model_validator
from primaite import getLogger
from primaite.simulator.network.hardware.base import Layer3Interface, NetworkInterface, WiredNetworkInterface
@@ -15,90 +17,29 @@ from primaite.simulator.system.core.packet_capture import PacketCapture
_LOGGER = getLogger(__name__)
__all__ = ["AirSpaceFrequency", "WirelessNetworkInterface", "IPWirelessNetworkInterface"]
def format_hertz(hertz: float, format_terahertz: bool = False, decimals: int = 3) -> str:
"""
Convert a frequency in Hertz to a formatted string using the most appropriate unit.
class AirSpace:
"""Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission."""
Optionally includes formatting for Terahertz.
def __init__(self):
self._wireless_interfaces: Dict[str, WirelessNetworkInterface] = {}
self._wireless_interfaces_by_frequency: Dict[AirSpaceFrequency, List[WirelessNetworkInterface]] = {}
def show(self, frequency: Optional[AirSpaceFrequency] = None):
"""
Displays a summary of wireless interfaces in the airspace, optionally filtered by a specific frequency.
:param frequency: The frequency band to filter devices by. If None, devices for all frequencies are shown.
"""
table = PrettyTable()
table.field_names = ["Connected Node", "MAC Address", "IP Address", "Subnet Mask", "Frequency", "Status"]
# If a specific frequency is provided, filter by it; otherwise, use all frequencies.
frequencies_to_show = [frequency] if frequency else self._wireless_interfaces_by_frequency.keys()
for freq in frequencies_to_show:
interfaces = self._wireless_interfaces_by_frequency.get(freq, [])
for interface in interfaces:
status = "Enabled" if interface.enabled else "Disabled"
table.add_row(
[
interface._connected_node.hostname, # noqa
interface.mac_address,
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
str(freq),
status,
]
)
print(table)
def add_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Adds a wireless network interface to the airspace if it's not already present.
:param wireless_interface: The wireless network interface to be added.
"""
if wireless_interface.mac_address not in self._wireless_interfaces:
self._wireless_interfaces[wireless_interface.mac_address] = wireless_interface
if wireless_interface.frequency not in self._wireless_interfaces_by_frequency:
self._wireless_interfaces_by_frequency[wireless_interface.frequency] = []
self._wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Removes a wireless network interface from the airspace if it's present.
:param wireless_interface: The wireless network interface to be removed.
"""
if wireless_interface.mac_address in self._wireless_interfaces:
self._wireless_interfaces.pop(wireless_interface.mac_address)
self._wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
def clear(self):
"""
Clears all wireless network interfaces and their frequency associations from the airspace.
After calling this method, the airspace will contain no wireless network interfaces, and transmissions cannot
occur until new interfaces are added again.
"""
self._wireless_interfaces.clear()
self._wireless_interfaces_by_frequency.clear()
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
"""
Transmits a frame to all enabled wireless network interfaces on a specific frequency within the airspace.
This ensures that a wireless interface does not receive its own transmission.
:param frame: The frame to be transmitted.
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
excluded from the list of receivers to prevent it from receiving its own transmission.
"""
for wireless_interface in self._wireless_interfaces_by_frequency.get(sender_network_interface.frequency, []):
if wireless_interface != sender_network_interface and wireless_interface.enabled:
wireless_interface.receive_frame(frame)
:param hertz: Frequency in Hertz.
:param format_terahertz: Whether to format frequency in Terahertz, default is False.
:param decimals: Number of decimal places to round to, default is 3.
:returns: Formatted string with the frequency in the most suitable unit.
"""
format_str = f"{{:.{decimals}f}}"
if format_terahertz and hertz >= 1e12: # Terahertz
return format_str.format(hertz / 1e12) + " THz"
elif hertz >= 1e9: # Gigahertz
return format_str.format(hertz / 1e9) + " GHz"
elif hertz >= 1e6: # Megahertz
return format_str.format(hertz / 1e6) + " MHz"
elif hertz >= 1e3: # Kilohertz
return format_str.format(hertz / 1e3) + " kHz"
else: # Hertz
return format_str.format(hertz) + " Hz"
class AirSpaceFrequency(Enum):
@@ -110,12 +51,478 @@ class AirSpaceFrequency(Enum):
"""WiFi 5 GHz. Known for its higher data transmission speeds and reduced interference from other devices."""
def __str__(self) -> str:
hertz_str = format_hertz(hertz=self.value)
if self == AirSpaceFrequency.WIFI_2_4:
return "WiFi 2.4 GHz"
elif self == AirSpaceFrequency.WIFI_5:
return "WiFi 5 GHz"
else:
return "Unknown Frequency"
return f"WiFi {hertz_str}"
if self == AirSpaceFrequency.WIFI_5:
return f"WiFi {hertz_str}"
return "Unknown Frequency"
class ChannelWidth(Enum):
"""
Enumeration representing the available channel widths in MHz for wireless communications.
This enum facilitates standardising and validating channel width configurations.
Attributes:
WIDTH_20_MHZ (int): Represents a channel width of 20 MHz, commonly used for basic
Wi-Fi connectivity with standard range and interference resistance.
WIDTH_40_MHZ (int): Represents a channel width of 40 MHz, offering higher data
throughput at the expense of potentially increased interference.
WIDTH_80_MHZ (int): Represents a channel width of 80 MHz, typically used in modern
Wi-Fi setups for high data rate applications but with higher susceptibility to interference.
WIDTH_160_MHZ (int): Represents a channel width of 160 MHz, used for ultra-high-speed
network applications, providing maximum data throughput with significant
requirements on the spectral environment to minimize interference.
"""
WIDTH_20_MHZ = 20
"""
Represents a channel width of 20 MHz, commonly used for basic Wi-Fi connectivity with standard range and
interference resistance
"""
WIDTH_40_MHZ = 40
"""
Represents a channel width of 40 MHz, offering higher data throughput at the expense of potentially increased
interference.
"""
WIDTH_80_MHZ = 80
"""
Represents a channel width of 80 MHz, typically used in modern Wi-Fi setups for high data rate applications but
with higher susceptibility to interference.
"""
WIDTH_160_MHZ = 160
"""
Represents a channel width of 160 MHz, used for ultra-high-speed network applications, providing maximum data
throughput with significant requirements on the spectral environment to minimize interference.
"""
def __str__(self) -> str:
"""
Returns a string representation of the channel width.
:return: String in the format of "<value> MHz" indicating the channel width.
"""
return f"{self.value} MHz"
AirSpaceKeyType = Tuple[AirSpaceFrequency, ChannelWidth]
class AirspaceEnvironmentType(Enum):
"""Enum representing different types of airspace environments which affect wireless communication signals."""
RURAL = "rural"
"""
A rural environment offers clear channel conditions due to low population density and minimal electronic device
presence.
"""
OUTDOOR = "outdoor"
"""
Outdoor environments like parks or fields have minimal electronic interference.
"""
SUBURBAN = "suburban"
"""
Suburban environments strike a balance with fewer electronic interferences than urban but more than rural.
"""
OFFICE = "office"
"""
Office environments have moderate interference from numerous electronic devices and overlapping networks.
"""
URBAN = "urban"
"""
Urban environments are characterized by tall buildings and a high density of electronic devices, leading to
significant interference.
"""
INDUSTRIAL = "industrial"
"""
Industrial areas face high interference from heavy machinery and numerous electronic devices.
"""
TRANSPORT = "transport"
"""
Environments such as subways and buses where metal structures and high mobility create complex interference
patterns.
"""
DENSE_URBAN = "dense_urban"
"""
Dense urban areas like city centers have the highest level of signal interference due to the very high density of
buildings and devices.
"""
JAMMING_ZONE = "jamming_zone"
"""
A jamming zone environment where signals are actively interfered with, typically through the use of signal jammers
or scrambling devices. This represents the environment with the highest level of interference.
"""
BLOCKED = "blocked"
"""
A jamming zone environment with total levels of interference. Airspace is completely blocked.
"""
@property
def snr_impact(self) -> int:
"""
Returns the SNR impact associated with the environment.
:return: SNR impact in dB.
"""
impacts = {
AirspaceEnvironmentType.RURAL: 0,
AirspaceEnvironmentType.OUTDOOR: 1,
AirspaceEnvironmentType.SUBURBAN: -5,
AirspaceEnvironmentType.OFFICE: -7,
AirspaceEnvironmentType.URBAN: -10,
AirspaceEnvironmentType.INDUSTRIAL: -15,
AirspaceEnvironmentType.TRANSPORT: -12,
AirspaceEnvironmentType.DENSE_URBAN: -20,
AirspaceEnvironmentType.JAMMING_ZONE: -40,
AirspaceEnvironmentType.BLOCKED: -100,
}
return impacts[self]
def __str__(self) -> str:
return f"{self.value.title()} Environment (SNR Impact: {self.snr_impact})"
def estimate_snr(
frequency: AirSpaceFrequency, environment_type: AirspaceEnvironmentType, channel_width: ChannelWidth
) -> float:
"""
Estimate the Signal-to-Noise Ratio (SNR) based on the communication frequency, environment, and channel width.
This function considers both the base SNR value dependent on the frequency and the impact of environmental
factors and channel width on the SNR.
The SNR is adjusted by reducing it for wider channels, reflecting the increased noise floor from a broader
frequency range.
:param frequency: The operating frequency as defined by AirSpaceFrequency enum, influencing the base SNR. Higher
frequencies like 5 GHz generally start with a higher base SNR due to less noise.
:param environment_type: The type of environment from AirspaceEnvironmentType enum, which adjusts the SNR based on
expected environmental noise and interference levels.
:param channel_width: The channel width from ChannelWidth enum, where wider channels (80 MHz and 160 MHz) decrease
the SNR slightly due to an increased noise floor.
:return: Estimated SNR in dB, calculated as the base SNR modified by environmental and channel width impacts.
"""
base_snr = 40 if frequency == AirSpaceFrequency.WIFI_5 else 30
snr_impact = environment_type.snr_impact
# Adjust SNR impact based on channel width
if channel_width == ChannelWidth.WIDTH_80_MHZ or channel_width == ChannelWidth.WIDTH_160_MHZ:
snr_impact -= 3 # Assume wider channels have slightly lower SNR due to increased noise floor
return base_snr + snr_impact
def calculate_total_channel_capacity(
channel_width: ChannelWidth, frequency: AirSpaceFrequency, environment_type: AirspaceEnvironmentType
) -> float:
"""
Calculate the total theoretical data rate for the channel using the Shannon-Hartley theorem.
This function determines the channel's capacity by considering the bandwidth (derived from channel width),
and the signal-to-noise ratio (SNR) adjusted by frequency and environmental conditions.
The Shannon-Hartley theorem states that channel capacity C (in bits per second) can be calculated as:
``C = B * log2(1 + SNR)`` where B is the bandwidth in Hertz and SNR is the signal-to-noise ratio.
:param channel_width: The width of the channel as defined by ChannelWidth enum, converted to Hz for calculation.
:param frequency: The operating frequency as defined by AirSpaceFrequency enum, influencing the base SNR and part
of the SNR estimation.
:param environment_type: The type of environment as defined by AirspaceEnvironmentType enum, used in SNR estimation.
:return: Theoretical total data rate in Mbps for the entire channel.
"""
bandwidth_hz = channel_width.value * 1_000_000 # Convert MHz to Hz
snr_db = estimate_snr(frequency, environment_type, channel_width)
snr_linear = 10 ** (snr_db / 10)
total_capacity_bps = bandwidth_hz * np.log2(1 + snr_linear)
total_capacity_mbps = total_capacity_bps / 1_000_000
return total_capacity_mbps
def calculate_individual_device_rate(
channel_width: ChannelWidth,
frequency: AirSpaceFrequency,
environment_type: AirspaceEnvironmentType,
device_count: int,
) -> float:
"""
Calculate the theoretical data rate available to each individual device on the channel.
This function first calculates the total channel capacity and then divides this capacity by the number
of active devices to estimate each device's share of the bandwidth. This reflects the practical limitation
that multiple devices must share the same channel resources.
:param channel_width: The channel width as defined by ChannelWidth enum, used in total capacity calculation.
:param frequency: The operating frequency as defined by AirSpaceFrequency enum, used in total capacity calculation.
:param environment_type: The environment type as defined by AirspaceEnvironmentType enum, impacting SNR and
capacity.
:param device_count: The number of devices sharing the channel. If zero, returns zero to avoid division by zero.
:return: Theoretical data rate in Mbps available per device, based on shared channel capacity.
"""
total_capacity_mbps = calculate_total_channel_capacity(channel_width, frequency, environment_type)
if device_count == 0:
return 0 # Avoid division by zero
individual_device_rate_mbps = total_capacity_mbps / device_count
return individual_device_rate_mbps
class AirSpace(BaseModel):
"""
Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission.
This class provides functionalities to manage a collection of wireless network interfaces, each associated with
specific frequencies and channel widths. It includes methods to calculate and manage bandwidth loads, add and
remove wireless interfaces, and handle data transmission across these interfaces.
"""
airspace_environment_type_: AirspaceEnvironmentType = AirspaceEnvironmentType.URBAN
wireless_interfaces: Dict[str, WirelessNetworkInterface] = Field(default_factory=lambda: {})
wireless_interfaces_by_frequency_channel_width: Dict[AirSpaceKeyType, List[WirelessNetworkInterface]] = Field(
default_factory=lambda: {}
)
bandwidth_load: Dict[AirSpaceKeyType, float] = Field(default_factory=lambda: {})
frequency_channel_width_max_capacity_mbps: Dict[AirSpaceKeyType, float] = Field(default_factory=lambda: {})
def model_post_init(self, __context: Any) -> None:
"""
Initialize the airspace metadata after instantiation.
This method is called to set up initial configurations like the maximum capacity of each channel width and
frequency based on the current environment setting.
:param __context: Contextual data or settings, typically used for further initializations beyond
the basic constructor.
"""
self._set_frequency_channel_width_max_capacity_mbps()
def _set_frequency_channel_width_max_capacity_mbps(self):
"""
Private method to compute and set the maximum channel capacity in Mbps for each frequency and channel width.
Based on the airspace environment type, this method calculates the maximum possible data transmission
capacity for each combination of frequency and channel width available and stores these values.
These capacities are critical for managing and limiting bandwidth load during operations.
"""
print(
f"Rebuilding the frequency channel width maximum capacity dictionary based on "
f"airspace environment type {self.airspace_environment_type_}"
)
for frequency in AirSpaceFrequency:
for channel_width in ChannelWidth:
max_capacity = calculate_total_channel_capacity(
frequency=frequency, channel_width=channel_width, environment_type=self.airspace_environment_type
)
self.frequency_channel_width_max_capacity_mbps[frequency, channel_width] = max_capacity
@computed_field
@property
def airspace_environment_type(self) -> AirspaceEnvironmentType:
"""
Gets the current environment type of the airspace.
:return: The AirspaceEnvironmentType representing the current environment type.
"""
return self.airspace_environment_type_
@airspace_environment_type.setter
def airspace_environment_type(self, value: AirspaceEnvironmentType) -> None:
"""
Sets a new environment type for the airspace and updates related configurations.
Changing the environment type triggers a re-calculation of the maximum channel capacities and
adjustments to the current setup of wireless interfaces to ensure they are aligned with the
new environment settings.
:param value: The new environment type as an AirspaceEnvironmentType.
"""
if value != self.airspace_environment_type_:
print(f"Setting airspace_environment_type to {value}")
self.airspace_environment_type_ = value
self._set_frequency_channel_width_max_capacity_mbps()
wireless_interface_keys = list(self.wireless_interfaces.keys())
for wireless_interface_key in wireless_interface_keys:
wireless_interface = self.wireless_interfaces[wireless_interface_key]
self.remove_wireless_interface(wireless_interface)
self.add_wireless_interface(wireless_interface)
def show_bandwidth_load(self, markdown: bool = False):
"""
Prints a table of the current bandwidth load for each frequency and channel width combination on the airspace.
This method prints a tabulated view showing the utilisation of available bandwidth capacities for all configured
frequency and channel width pairings. The table includes the current capacity usage as a percentage of the
maximum capacity, alongside the absolute maximum capacity values in Mbps.
:param markdown: Flag indicating if output should be in markdown format.
"""
headers = ["Frequency", "Channel Width", "Current Capacity (%)", "Maximum Capacity (Mbit)"]
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "Airspace Frequency Channel Loads"
for key, load in self.bandwidth_load.items():
frequency, channel_width = key
maximum_capacity = self.frequency_channel_width_max_capacity_mbps[key]
load_percent = load / maximum_capacity
if load_percent > 1.0:
load_percent = 1.0
table.add_row(
[format_hertz(frequency.value), str(channel_width), f"{load_percent:.0%}", f"{maximum_capacity:.3f}"]
)
print(table)
def show_wireless_interfaces(self, markdown: bool = False):
"""
Prints a table of wireless interfaces in the airspace.
:param markdown: Flag indicating if output should be in markdown format.
"""
headers = [
"Connected Node",
"MAC Address",
"IP Address",
"Subnet Mask",
"Frequency",
"Channel Width",
"Speed (Mbps)",
"Status",
]
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"Devices on Air Space - {self.airspace_environment_type}"
for interface in self.wireless_interfaces.values():
status = "Enabled" if interface.enabled else "Disabled"
table.add_row(
[
interface._connected_node.hostname, # noqa
interface.mac_address,
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
format_hertz(interface.frequency.value),
str(interface.channel_width),
f"{interface.speed:.3f}",
status,
]
)
print(table.get_string(sortby="Frequency"))
def show(self, markdown: bool = False):
"""
Prints a summary of the current state of the airspace, including both wireless interfaces and bandwidth loads.
This method is a convenient wrapper that calls two separate methods to display detailed tables: one for
wireless interfaces and another for bandwidth load across all frequencies and channel widths managed within the
airspace. It provides a holistic view of the operational status and performance metrics of the airspace.
:param markdown: Flag indicating if output should be in markdown format.
"""
self.show_wireless_interfaces(markdown)
self.show_bandwidth_load(markdown)
def add_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Adds a wireless network interface to the airspace if it's not already present.
:param wireless_interface: The wireless network interface to be added.
"""
if wireless_interface.mac_address not in self.wireless_interfaces:
self.wireless_interfaces[wireless_interface.mac_address] = wireless_interface
if wireless_interface.airspace_key not in self.wireless_interfaces_by_frequency_channel_width:
self.wireless_interfaces_by_frequency_channel_width[wireless_interface.airspace_key] = []
self.wireless_interfaces_by_frequency_channel_width[wireless_interface.airspace_key].append(
wireless_interface
)
speed = calculate_total_channel_capacity(
wireless_interface.channel_width, wireless_interface.frequency, self.airspace_environment_type
)
wireless_interface.set_speed(speed)
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
Removes a wireless network interface from the airspace if it's present.
:param wireless_interface: The wireless network interface to be removed.
"""
if wireless_interface.mac_address in self.wireless_interfaces:
self.wireless_interfaces.pop(wireless_interface.mac_address)
self.wireless_interfaces_by_frequency_channel_width[wireless_interface.airspace_key].remove(
wireless_interface
)
def clear(self):
"""
Clears all wireless network interfaces and their frequency associations from the airspace.
After calling this method, the airspace will contain no wireless network interfaces, and transmissions cannot
occur until new interfaces are added again.
"""
self.wireless_interfaces.clear()
self.wireless_interfaces_by_frequency_channel_width.clear()
def reset_bandwidth_load(self):
"""
Resets the bandwidth load tracking for all frequencies in the airspace.
This method clears the current load metrics for all operating frequencies, effectively setting the load to zero.
"""
self.bandwidth_load = {}
def can_transmit_frame(self, frame: Frame, sender_network_interface: WirelessNetworkInterface) -> bool:
"""
Determines if a frame can be transmitted by the sender network interface based on the current bandwidth load.
This method checks if adding the size of the frame to the current bandwidth load of the frequency used by the
sender network interface would exceed the maximum allowed bandwidth for that frequency. It returns True if the
frame can be transmitted without exceeding the limit, and False otherwise.
:param frame: The frame to be transmitted, used to check its size against the frequency's bandwidth limit.
:param sender_network_interface: The network interface attempting to transmit the frame, used to determine the
relevant frequency and its current bandwidth load.
:return: True if the frame can be transmitted within the bandwidth limit, False if it would exceed the limit.
"""
if sender_network_interface.airspace_key not in self.bandwidth_load:
self.bandwidth_load[sender_network_interface.airspace_key] = 0.0
return (
self.bandwidth_load[sender_network_interface.airspace_key] + frame.size_Mbits
<= self.frequency_channel_width_max_capacity_mbps[sender_network_interface.airspace_key]
)
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
"""
Transmits a frame to all enabled wireless network interfaces on a specific frequency within the airspace.
This ensures that a wireless interface does not receive its own transmission.
:param frame: The frame to be transmitted.
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
excluded from the list of receivers to prevent it from receiving its own transmission.
"""
self.bandwidth_load[sender_network_interface.airspace_key] += frame.size_Mbits
for wireless_interface in self.wireless_interfaces_by_frequency_channel_width.get(
sender_network_interface.airspace_key, []
):
if wireless_interface != sender_network_interface and wireless_interface.enabled:
wireless_interface.receive_frame(frame)
class WirelessNetworkInterface(NetworkInterface, ABC):
@@ -139,7 +546,135 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
"""
airspace: AirSpace
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4
frequency_: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4
channel_width_: ChannelWidth = ChannelWidth.WIDTH_40_MHZ
@model_validator(mode="after") # noqa
def validate_channel_width_for_2_4_ghz(self) -> "WirelessNetworkInterface":
"""
Validate the wireless interface's channel width settings after model changes.
This method serves as a model validator to ensure that the channel width settings for the 2.4 GHz frequency
comply with accepted standards (either 20 MHz or 40 MHz). It's triggered after model instantiation.
Ensures that the channel width is appropriate for the current frequency setting, particularly checking
and adjusting the settings for the 2.4 GHz frequency band to not exceed 40 MHz. This is crucial for
avoiding interference and ensuring optimal performance in densely populated wireless environments.
"""
self._check_wifi_24_channel_width()
return self
def model_post_init(self, __context: Any) -> None:
"""Initialise the model after its creation, setting the speed based on the calculated channel capacity."""
speed = calculate_total_channel_capacity(
channel_width=self.channel_width,
frequency=self.frequency,
environment_type=self.airspace.airspace_environment_type,
)
self.set_speed(speed)
def _check_wifi_24_channel_width(self) -> None:
"""
Ensures that the channel width for 2.4 GHz frequency does not exceed 40 MHz.
This method checks the current frequency and channel width settings and adjusts the channel width
to 40 MHz if the frequency is set to 2.4 GHz and the channel width exceeds 40 MHz. This is done to
comply with typical Wi-Fi standards for 2.4 GHz frequencies, which commonly support up to 40 MHz.
Logs a SysLog warning if the channel width had to be adjusted, logging this change either to the connected
node's system log or the global logger, depending on whether the interface is connected to a node.
"""
if self.frequency_ == AirSpaceFrequency.WIFI_2_4 and self.channel_width_.value > 40:
self.channel_width_ = ChannelWidth.WIDTH_40_MHZ
msg = (
f"Channel width must be either 20 Mhz or 40 Mhz when using {AirSpaceFrequency.WIFI_2_4}. "
f"Overriding value to use {ChannelWidth.WIDTH_40_MHZ}."
)
if self._connected_node:
self._connected_node.sys_log.warning(f"Wireless Interface {self.port_num}: {msg}")
else:
_LOGGER.warning(msg)
@computed_field
@property
def frequency(self) -> AirSpaceFrequency:
"""
Get the current operating frequency of the wireless interface.
:return: The current frequency as an AirSpaceFrequency enum value.
"""
return self.frequency_
@frequency.setter
def frequency(self, value: AirSpaceFrequency) -> None:
"""
Set the operating frequency of the wireless interface and update the network configuration.
This setter updates the frequency of the wireless interface if the new value differs from the current setting.
It handles the update by first removing the interface from the current airspace management to avoid conflicts,
setting the new frequency, ensuring the channel width remains compliant, and then re-adding the interface
to the airspace with the new settings.
:param value: The new frequency to set, as an AirSpaceFrequency enum value.
"""
if value != self.frequency_:
self.airspace.remove_wireless_interface(self)
self.frequency_ = value
self._check_wifi_24_channel_width()
self.airspace.add_wireless_interface(self)
@computed_field
@property
def channel_width(self) -> ChannelWidth:
"""
Get the current channel width setting of the wireless interface.
:return: The current channel width as a ChannelWidth enum value.
"""
return self.channel_width_
@channel_width.setter
def channel_width(self, value: ChannelWidth) -> None:
"""
Set the channel width of the wireless interface and manage configuration compliance.
Updates the channel width of the wireless interface. If the new channel width is different from the existing
one, it first removes the interface from the airspace to prevent configuration conflicts, sets the new channel
width, checks and adjusts it if necessary (especially for 2.4 GHz frequency to comply with typical standards),
and then re-registers the interface in the airspace with updated settings.
:param value: The new channel width to set, as a ChannelWidth enum value.
"""
if value != self.channel_width_:
self.airspace.remove_wireless_interface(self)
self.channel_width_ = value
self._check_wifi_24_channel_width()
self.airspace.add_wireless_interface(self)
@property
def airspace_key(self) -> tuple:
"""
The airspace bandwidth/channel identifier for the wireless interface based on its frequency and channel width.
:return: A tuple containing the frequency and channel width, serving as a bandwidth/channel key.
"""
return self.frequency_, self.channel_width_
def set_speed(self, speed: float):
"""
Sets the network interface speed to the specified value and logs this action.
This method updates the speed attribute of the network interface to the given value, reflecting
the theoretical maximum data rate that the interface can support based on the current settings.
It logs the new speed to the system log of the connected node if available.
:param speed: The speed in Mbps to be set for the network interface.
"""
self.speed = speed
if self._connected_node:
self._connected_node.sys_log.info(
f"Wireless Interface {self.port_num}: Setting theoretical maximum data rate to {speed:.3f} Mbps."
)
def enable(self):
"""Attempt to enable the network interface."""
@@ -185,13 +720,18 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
:param frame: The network frame to be sent.
:return: True if the frame is sent successfully, False if the network interface is disabled.
"""
if self.enabled:
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self.airspace.transmit(frame, self)
return True
# Cannot send Frame as the network interface is not enabled
return False
if not self.enabled:
return False
if not self.airspace.can_transmit_frame(frame, self):
# Drop frame for now. Queuing will happen here (probably) if it's done in the future.
self._connected_node.sys_log.info(f"{self}: Frame dropped as Link is at capacity")
return False
super().send_frame(frame)
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self.airspace.transmit(frame, self)
return True
def receive_frame(self, frame: Frame) -> bool:
"""

View File

@@ -96,6 +96,8 @@ class Network(SimComponent):
"""Apply pre-timestep logic."""
super().pre_timestep(timestep)
self.airspace.reset_bandwidth_load()
for node in self.nodes.values():
node.pre_timestep(timestep)

View File

@@ -87,7 +87,7 @@ class NetworkInterface(SimComponent, ABC):
mac_address: str = Field(default_factory=generate_mac_address)
"The MAC address of the interface."
speed: int = 100
speed: float = 100.0
"The speed of the interface in Mbps. Default is 100 Mbps."
mtu: int = 1500
@@ -499,14 +499,17 @@ class WiredNetworkInterface(NetworkInterface, ABC):
:param frame: The network frame to be sent.
:return: True if the frame is sent, False if the Network Interface is disabled or not connected to a link.
"""
if not self.enabled:
return False
if not self._connected_link.can_transmit_frame(frame):
# Drop frame for now. Queuing will happen here (probably) if it's done in the future.
self._connected_node.sys_log.info(f"{self}: Frame dropped as Link is at capacity")
return False
super().send_frame(frame)
if self.enabled:
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
# Cannot send Frame as the NIC is not enabled
return False
frame.set_sent_timestamp()
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
@abstractmethod
def receive_frame(self, frame: Frame) -> bool:
@@ -737,12 +740,21 @@ class Link(SimComponent):
"""
return self.endpoint_a.enabled and self.endpoint_b.enabled
def _can_transmit(self, frame: Frame) -> bool:
def can_transmit_frame(self, frame: Frame) -> bool:
"""
Determines whether a frame can be transmitted considering the current Link load and the Link's bandwidth.
This method assesses if the transmission of a given frame is possible without exceeding the Link's total
bandwidth capacity. It checks if the current load of the Link plus the size of the frame (expressed in Mbps)
would remain within the defined bandwidth limits. The transmission is only feasible if the Link is active
('up') and the total load including the new frame does not surpass the bandwidth limit.
:param frame: The frame intended for transmission, which contains its size in Mbps.
:return: True if the frame can be transmitted without exceeding the bandwidth limit, False otherwise.
"""
if self.is_up:
frame_size_Mbits = frame.size_Mbits # noqa - Leaving it as Mbits as this is how they're expressed
# return self.current_load + frame_size_Mbits <= self.bandwidth
# TODO: re add this check once packet size limiting and MTU checks are implemented
return True
return self.current_load + frame.size_Mbits <= self.bandwidth
return False
def transmit_frame(self, sender_nic: WiredNetworkInterface, frame: Frame) -> bool:
@@ -753,11 +765,6 @@ class Link(SimComponent):
:param frame: The network frame to be sent.
:return: True if the Frame can be sent, otherwise False.
"""
can_transmit = self._can_transmit(frame)
if not can_transmit:
_LOGGER.debug(f"Cannot transmit frame as {self} is at capacity")
return False
receiver = self.endpoint_a
if receiver == sender_nic:
receiver = self.endpoint_b

View File

@@ -58,12 +58,16 @@ class SwitchPort(WiredNetworkInterface):
:param frame: The network frame to be sent.
:return: A boolean indicating whether the frame was successfully sent.
"""
if self.enabled:
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
# Cannot send Frame as the SwitchPort is not enabled
return False
if not self.enabled:
return False
if not self._connected_link.can_transmit_frame(frame):
# Drop frame for now. Queuing will happen here (probably) if it's done in the future.
self._connected_node.sys_log.info(f"{self}: Frame dropped as Link is at capacity")
return False
self.pcap.capture_outbound(frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
def receive_frame(self, frame: Frame) -> bool:
"""

View File

@@ -1,10 +1,10 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
from typing import Any, Dict, Union
from typing import Any, Dict, Optional, Union
from pydantic import validate_call
from primaite.simulator.network.airspace import AirSpace, AirSpaceFrequency, IPWirelessNetworkInterface
from primaite.simulator.network.airspace import AirSpace, AirSpaceFrequency, ChannelWidth, IPWirelessNetworkInterface
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
from primaite.simulator.network.transmission.data_link_layer import Frame
@@ -153,7 +153,8 @@ class WirelessRouter(Router):
self,
ip_address: IPV4Address,
subnet_mask: IPV4Address,
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4,
frequency: Optional[AirSpaceFrequency] = AirSpaceFrequency.WIFI_2_4,
channel_width: Optional[ChannelWidth] = ChannelWidth.WIDTH_40_MHZ,
):
"""
Configures a wireless access point (WAP).
@@ -170,13 +171,23 @@ class WirelessRouter(Router):
enum. This determines the frequency band (e.g., 2.4 GHz or 5 GHz) the access point will use for wireless
communication. Default is AirSpaceFrequency.WIFI_2_4.
"""
if not frequency:
frequency = AirSpaceFrequency.WIFI_2_4
if not channel_width:
channel_width = ChannelWidth.WIDTH_40_MHZ
self.sys_log.info("Configuring wireless access point")
self.wireless_access_point.disable() # Temporarily disable the WAP for reconfiguration
network_interface = self.network_interface[1]
network_interface.ip_address = ip_address
network_interface.subnet_mask = subnet_mask
self.sys_log.info(f"Configured WAP {network_interface}")
self.wireless_access_point.frequency = frequency # Set operating frequency
self.wireless_access_point.channel_width = channel_width
self.wireless_access_point.enable() # Re-enable the WAP with new settings
self.sys_log.info(f"Configured WAP {network_interface}")
@property
def router_interface(self) -> RouterInterface:
@@ -258,7 +269,12 @@ class WirelessRouter(Router):
ip_address = cfg["wireless_access_point"]["ip_address"]
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
frequency = AirSpaceFrequency[cfg["wireless_access_point"]["frequency"]]
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
channel_width = cfg["wireless_access_point"].get("channel_width")
if channel_width:
channel_width = ChannelWidth(channel_width)
router.configure_wireless_access_point(
ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency, channel_width=channel_width
)
if "acl" in cfg:
for r_num, r_cfg in cfg["acl"].items():

View File

@@ -133,10 +133,11 @@ class Frame(BaseModel):
def size(self) -> float: # noqa - Keep it as MBits as this is how they're expressed
"""The size of the Frame in Bytes."""
# get the payload size if it is a data packet
payload_size = 0.0
if isinstance(self.payload, DataPacket):
return self.payload.get_packet_size()
payload_size = self.payload.get_packet_size()
return float(len(self.model_dump_json().encode("utf-8")))
return float(len(self.model_dump_json().encode("utf-8"))) + payload_size
@property
def size_Mbits(self) -> float: # noqa - Keep it as MBits as this is how they're expressed

View File

@@ -82,12 +82,31 @@ def config_callback(
show_default=False,
),
] = None,
agent_log_level: Annotated[
LogLevel,
typer.Option(
"--agent-log-level",
"-level",
click_type=click.Choice(LogLevel._member_names_, case_sensitive=False),
help="The level of agent behaviour logs to output.",
show_default=False,
),
] = None,
output_sys_logs: Annotated[
bool,
typer.Option(
"--output-sys-logs/--no-sys-logs", "-sys/-nsys", help="Output system logs to file.", show_default=False
),
] = None,
output_agent_logs: Annotated[
bool,
typer.Option(
"--output-agent-logs/--no-agent-logs",
"-agent/-nagent",
help="Output agent logs to file.",
show_default=False,
),
] = None,
output_pcap_logs: Annotated[
bool,
typer.Option(
@@ -109,10 +128,18 @@ def config_callback(
PRIMAITE_CONFIG["developer_mode"]["sys_log_level"] = ctx.params.get("sys_log_level")
print(f"PrimAITE dev-mode config updated sys_log_level={ctx.params.get('sys_log_level')}")
if ctx.params.get("agent_log_level") is not None:
PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] = ctx.params.get("agent_log_level")
print(f"PrimAITE dev-mode config updated agent_log_level={ctx.params.get('agent_log_level')}")
if output_sys_logs is not None:
PRIMAITE_CONFIG["developer_mode"]["output_sys_logs"] = output_sys_logs
print(f"PrimAITE dev-mode config updated {output_sys_logs=}")
if output_agent_logs is not None:
PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] = output_agent_logs
print(f"PrimAITE dev-mode config updated {output_agent_logs=}")
if output_pcap_logs is not None:
PRIMAITE_CONFIG["developer_mode"]["output_pcap_logs"] = output_pcap_logs
print(f"PrimAITE dev-mode config updated {output_pcap_logs=}")

View File

@@ -9,6 +9,9 @@ io_settings:
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
agent_log_level: INFO
save_agent_logs: true
write_agent_log_to_terminal: True
game:

View File

@@ -41,6 +41,12 @@ agents:
options:
source_node: client_1
target_ip_address: 192.168.10.0/24
target_port:
- 21
- 53
- 80
- 123
- 219
reward_function:
reward_components:

View File

@@ -9,6 +9,8 @@ game:
simulation:
network:
airspace:
airspace_environment_type: urban
nodes:
- type: computer
hostname: pc_a

View File

@@ -0,0 +1,81 @@
game:
max_episode_length: 256
ports:
- ARP
protocols:
- ICMP
- TCP
- UDP
simulation:
network:
airspace:
airspace_environment_type: blocked
nodes:
- type: computer
hostname: pc_a
ip_address: 192.168.0.2
subnet_mask: 255.255.255.0
default_gateway: 192.168.0.1
start_up_duration: 0
- type: computer
hostname: pc_b
ip_address: 192.168.2.2
subnet_mask: 255.255.255.0
default_gateway: 192.168.2.1
start_up_duration: 0
- type: wireless_router
hostname: router_1
start_up_duration: 0
router_interface:
ip_address: 192.168.0.1
subnet_mask: 255.255.255.0
wireless_access_point:
ip_address: 192.168.1.1
subnet_mask: 255.255.255.0
frequency: WIFI_5
channel_width: 80
acl:
1:
action: PERMIT
routes:
- address: 192.168.2.0 # PC B subnet
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.2
metric: 0
- type: wireless_router
hostname: router_2
start_up_duration: 0
router_interface:
ip_address: 192.168.2.1
subnet_mask: 255.255.255.0
wireless_access_point:
ip_address: 192.168.1.2
subnet_mask: 255.255.255.0
frequency: WIFI_5
channel_width: 80
acl:
1:
action: PERMIT
routes:
- address: 192.168.0.0 # PC A subnet
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.1
metric: 0
links:
- endpoint_a_hostname: pc_a
endpoint_a_port: 1
endpoint_b_hostname: router_1
endpoint_b_port: 2
- endpoint_a_hostname: pc_b
endpoint_a_port: 1
endpoint_b_hostname: router_2
endpoint_b_port: 2

View File

@@ -0,0 +1,81 @@
game:
max_episode_length: 256
ports:
- ARP
protocols:
- ICMP
- TCP
- UDP
simulation:
network:
airspace:
airspace_environment_type: urban
nodes:
- type: computer
hostname: pc_a
ip_address: 192.168.0.2
subnet_mask: 255.255.255.0
default_gateway: 192.168.0.1
start_up_duration: 0
- type: computer
hostname: pc_b
ip_address: 192.168.2.2
subnet_mask: 255.255.255.0
default_gateway: 192.168.2.1
start_up_duration: 0
- type: wireless_router
hostname: router_1
start_up_duration: 0
router_interface:
ip_address: 192.168.0.1
subnet_mask: 255.255.255.0
wireless_access_point:
ip_address: 192.168.1.1
subnet_mask: 255.255.255.0
frequency: WIFI_5
channel_width: 80
acl:
1:
action: PERMIT
routes:
- address: 192.168.2.0 # PC B subnet
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.2
metric: 0
- type: wireless_router
hostname: router_2
start_up_duration: 0
router_interface:
ip_address: 192.168.2.1
subnet_mask: 255.255.255.0
wireless_access_point:
ip_address: 192.168.1.2
subnet_mask: 255.255.255.0
frequency: WIFI_5
channel_width: 80
acl:
1:
action: PERMIT
routes:
- address: 192.168.0.0 # PC A subnet
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.1
metric: 0
links:
- endpoint_a_hostname: pc_a
endpoint_a_port: 1
endpoint_b_hostname: router_1
endpoint_b_port: 2
- endpoint_a_hostname: pc_b
endpoint_a_port: 1
endpoint_b_hostname: router_2
endpoint_b_port: 2

View File

@@ -53,11 +53,11 @@ class TestService(Service):
pass
class TestDummyApplication(Application, identifier="TestDummyApplication"):
class DummyApplication(Application, identifier="DummyApplication"):
"""Test Application class"""
def __init__(self, **kwargs):
kwargs["name"] = "TestDummyApplication"
kwargs["name"] = "DummyApplication"
kwargs["port"] = Port.HTTP
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
@@ -87,9 +87,9 @@ def service_class():
@pytest.fixture(scope="function")
def application(file_system) -> TestDummyApplication:
return TestDummyApplication(
name="TestDummyApplication",
def application(file_system) -> DummyApplication:
return DummyApplication(
name="DummyApplication",
port=Port.ARP,
file_system=file_system,
sys_log=SysLog(hostname="dummy_application"),
@@ -98,7 +98,7 @@ def application(file_system) -> TestDummyApplication:
@pytest.fixture(scope="function")
def application_class():
return TestDummyApplication
return DummyApplication
@pytest.fixture(scope="function")
@@ -257,8 +257,7 @@ def example_network() -> Network:
server_2.power_on()
network.connect(endpoint_b=server_2.network_interface[1], endpoint_a=switch_1.network_interface[2])
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
router_1.acl.add_rule(action=ACLAction.PERMIT, position=1)
assert all(link.is_up for link in network.links.values())

View File

@@ -67,7 +67,7 @@ def test_dev_mode_config_sys_log_level():
# check defaults
assert PRIMAITE_CONFIG["developer_mode"]["sys_log_level"] == "DEBUG" # DEBUG by default
result = cli(["dev-mode", "config", "-level", "WARNING"])
result = cli(["dev-mode", "config", "--sys-log-level", "WARNING"])
assert "sys_log_level=WARNING" in result.output # should print correct value
@@ -78,10 +78,30 @@ def test_dev_mode_config_sys_log_level():
assert "sys_log_level=INFO" in result.output # should print correct value
# config should reflect that log level is WARNING
# config should reflect that log level is INFO
assert PRIMAITE_CONFIG["developer_mode"]["sys_log_level"] == "INFO"
def test_dev_mode_config_agent_log_level():
"""Check that the agent log level can be changed via CLI."""
# check defaults
assert PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] == "DEBUG" # DEBUG by default
result = cli(["dev-mode", "config", "-level", "WARNING"])
assert "agent_log_level=WARNING" in result.output # should print correct value
# config should reflect that log level is WARNING
assert PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] == "WARNING"
result = cli(["dev-mode", "config", "--agent-log-level", "INFO"])
assert "agent_log_level=INFO" in result.output # should print correct value
# config should reflect that log level is INFO
assert PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] == "INFO"
def test_dev_mode_config_sys_logs_enable_disable():
"""Test that the system logs output can be enabled or disabled."""
# check defaults
@@ -112,6 +132,36 @@ def test_dev_mode_config_sys_logs_enable_disable():
assert PRIMAITE_CONFIG["developer_mode"]["output_sys_logs"] is False
def test_dev_mode_config_agent_logs_enable_disable():
"""Test that the agent logs output can be enabled or disabled."""
# check defaults
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] is False # False by default
result = cli(["dev-mode", "config", "--output-agent-logs"])
assert "output_agent_logs=True" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"]
result = cli(["dev-mode", "config", "--no-agent-logs"])
assert "output_agent_logs=False" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] is False
result = cli(["dev-mode", "config", "-agent"])
assert "output_agent_logs=True" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"]
result = cli(["dev-mode", "config", "-nagent"])
assert "output_agent_logs=False" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] is False
def test_dev_mode_config_pcap_logs_enable_disable():
"""Test that the pcap logs output can be enabled or disabled."""
# check defaults

View File

@@ -35,3 +35,7 @@ def test_io_settings():
assert env.io.settings.save_step_metadata is False
assert env.io.settings.write_sys_log_to_terminal is False # false by default
assert env.io.settings.save_agent_logs is True
assert env.io.settings.agent_log_level is LogLevel.INFO
assert env.io.settings.write_agent_log_to_terminal is True # Set to True by the config file.

View File

@@ -16,7 +16,7 @@ from tests import TEST_ASSETS_ROOT
TEST_CONFIG = TEST_ASSETS_ROOT / "configs/software_fix_duration.yaml"
ONE_ITEM_CONFIG = TEST_ASSETS_ROOT / "configs/fix_duration_one_item.yaml"
TestApplications = ["TestDummyApplication", "TestBroadcastClient"]
TestApplications = ["DummyApplication", "BroadcastTestClient"]
def load_config(config_path: Union[str, Path]) -> PrimaiteGame:

View File

@@ -0,0 +1,54 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.service import ServiceOperatingState
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_application_cannot_perform_actions_unless_running(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test the the request permissions prevent any actions unless application is running."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
browser: WebBrowser = client_1.software_manager.software.get("WebBrowser")
browser.close()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_SCAN", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_CLOSE", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_FIX", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_EXECUTE", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED

View File

@@ -0,0 +1,159 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import uuid
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_create_file(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a files to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
random_folder = str(uuid.uuid4())
random_file = str(uuid.uuid4())
assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is None
action = (
"NODE_FILE_CREATE",
{"node_id": 0, "folder_name": random_folder, "file_name": random_file},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is not None
def test_file_delete_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be deleted."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.deleted is False
action = (
"NODE_FILE_DELETE",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.deleted
def test_file_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be scanned."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FILE_SCAN",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_repair_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FILE_REPAIR",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_restore_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be restored."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FILE_RESTORE",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_corrupt_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be corrupted."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FILE_CORRUPT",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_access_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be accessed."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.num_access == 0
action = (
"NODE_FILE_ACCESS",
{"node_id": 0, "folder_name": file.folder_name, "file_name": file.name},
)
agent.store_action(action)
game.step()
assert file.num_access == 1

View File

@@ -0,0 +1,123 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import uuid
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_create_folder(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
random_folder = str(uuid.uuid4())
assert client_1.file_system.get_folder(folder_name=random_folder) is None
action = (
"NODE_FOLDER_CREATE",
{
"node_id": 0,
"folder_name": random_folder,
},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_folder(folder_name=random_folder) is not None
def test_folder_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before scanning."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FOLDER_SCAN",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
for i in range(folder.scan_duration + 1):
game.step()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_repair_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before repairing."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FOLDER_REPAIR",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_restore_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before restoring."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FOLDER_RESTORE",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
assert folder.health_status == FileSystemItemHealthStatus.RESTORING

View File

@@ -0,0 +1,95 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_nic_cannot_be_turned_off_if_not_on(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a NIC cannot be disabled if it is not enabled."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
nic = client_1.network_interface[1]
nic.disable()
assert nic.enabled is False
action = (
"HOST_NIC_DISABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled is False
def test_nic_cannot_be_turned_on_if_already_on(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a NIC cannot be enabled if it is already enabled."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
nic = client_1.network_interface[1]
assert nic.enabled
action = (
"HOST_NIC_ENABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled
def test_that_a_nic_can_be_enabled_and_disabled(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Tests that a NIC can be enabled and disabled."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
nic = client_1.network_interface[1]
assert nic.enabled
action = (
"HOST_NIC_DISABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled is False
action = (
"HOST_NIC_ENABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled

View File

@@ -1 +1,94 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_node_startup_shutdown(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the node can be shut down and started up."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert client_1.operating_state == NodeOperatingState.ON
# turn it off
action = ("NODE_SHUTDOWN", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.SHUTTING_DOWN
for i in range(client_1.shut_down_duration + 1):
action = ("DONOTHING", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.OFF
# turn it on
action = ("NODE_STARTUP", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.BOOTING
for i in range(client_1.start_up_duration + 1):
action = ("DONOTHING", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.ON
def test_node_cannot_be_started_up_if_node_is_already_on(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a node cannot be started up if it is already on."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert client_1.operating_state == NodeOperatingState.ON
# turn it on
action = ("NODE_STARTUP", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.ON
def test_node_cannot_be_shut_down_if_node_is_already_off(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a node cannot be shut down if it is already off."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.power_off()
for i in range(client_1.shut_down_duration + 1):
action = ("DONOTHING", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.OFF
# turn it ff
action = ("NODE_SHUTDOWN", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.OFF

View File

@@ -0,0 +1,106 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.system.services.service import ServiceOperatingState
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_service_start(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator makes sure that the service is stopped before starting the service."""
game, agent = game_and_agent_fixture
server_1: Server = game.simulation.network.get_node_by_hostname("server_1")
dns_server = server_1.software_manager.software.get("DNSServer")
dns_server.pause()
assert dns_server.operating_state == ServiceOperatingState.PAUSED
action = ("NODE_SERVICE_START", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.PAUSED
dns_server.stop()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_START", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.RUNNING
def test_service_resume(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator checks if the service is paused before resuming."""
game, agent = game_and_agent_fixture
server_1: Server = game.simulation.network.get_node_by_hostname("server_1")
dns_server = server_1.software_manager.software.get("DNSServer")
action = ("NODE_SERVICE_RESUME", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.RUNNING
dns_server.pause()
assert dns_server.operating_state == ServiceOperatingState.PAUSED
action = ("NODE_SERVICE_RESUME", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.RUNNING
def test_service_cannot_perform_actions_unless_running(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the service cannot perform certain actions while not running."""
game, agent = game_and_agent_fixture
server_1: Server = game.simulation.network.get_node_by_hostname("server_1")
dns_server = server_1.software_manager.software.get("DNSServer")
dns_server.stop()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_SCAN", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_PAUSE", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_RESUME", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_RESTART", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_FIX", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED

View File

@@ -155,7 +155,7 @@ def test_nic_monitored_traffic(simulation):
assert traffic_obs["icmp"]["outbound"] == 0
# send a ping
pc.ping(target_ip_address=pc2.network_interface[1].ip_address)
assert pc.ping(target_ip_address=pc2.network_interface[1].ip_address)
traffic_obs = nic_obs.observe(simulation.describe_state()).get("TRAFFIC")
assert traffic_obs["icmp"]["inbound"] == 1
@@ -178,7 +178,7 @@ def test_nic_monitored_traffic(simulation):
traffic_obs = nic_obs.observe(simulation.describe_state()).get("TRAFFIC")
assert traffic_obs["icmp"]["inbound"] == 0
assert traffic_obs["icmp"]["outbound"] == 0
assert traffic_obs["tcp"][53]["inbound"] == 0
assert traffic_obs["tcp"][53]["inbound"] == 1
assert traffic_obs["tcp"][53]["outbound"] == 1 # getting a webpage sent a dns request out
simulation.pre_timestep(2) # apply timestep to whole sim

View File

@@ -0,0 +1,106 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.airspace import (
AirspaceEnvironmentType,
AirSpaceFrequency,
calculate_total_channel_capacity,
ChannelWidth,
)
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from tests import TEST_ASSETS_ROOT
def test_wireless_wan_wifi_5_80_channel_width_urban():
config_path = TEST_ASSETS_ROOT / "configs" / "wireless_wan_wifi_5_80_channel_width_urban.yaml"
with open(config_path, "r") as f:
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
airspace = network.airspace
assert airspace.airspace_environment_type == AirspaceEnvironmentType.URBAN
router_1: WirelessRouter = network.get_node_by_hostname("router_1")
router_2: WirelessRouter = network.get_node_by_hostname("router_2")
expected_speed = calculate_total_channel_capacity(
channel_width=ChannelWidth.WIDTH_80_MHZ,
frequency=AirSpaceFrequency.WIFI_5,
environment_type=AirspaceEnvironmentType.URBAN,
)
assert router_1.wireless_access_point.speed == expected_speed
assert router_2.wireless_access_point.speed == expected_speed
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")
assert pc_a.ping(pc_a.default_gateway), "PC A should ping its default gateway successfully."
assert pc_b.ping(pc_b.default_gateway), "PC B should ping its default gateway successfully."
assert pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers successfully."
assert pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers successfully."
def test_wireless_wan_wifi_5_80_channel_width_blocked():
config_path = TEST_ASSETS_ROOT / "configs" / "wireless_wan_wifi_5_80_channel_width_blocked.yaml"
with open(config_path, "r") as f:
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
airspace = network.airspace
assert airspace.airspace_environment_type == AirspaceEnvironmentType.BLOCKED
router_1: WirelessRouter = network.get_node_by_hostname("router_1")
router_2: WirelessRouter = network.get_node_by_hostname("router_2")
expected_speed = calculate_total_channel_capacity(
channel_width=ChannelWidth.WIDTH_80_MHZ,
frequency=AirSpaceFrequency.WIFI_5,
environment_type=AirspaceEnvironmentType.BLOCKED,
)
assert router_1.wireless_access_point.speed == expected_speed
assert router_2.wireless_access_point.speed == expected_speed
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")
assert pc_a.ping(pc_a.default_gateway), "PC A should ping its default gateway successfully."
assert pc_b.ping(pc_b.default_gateway), "PC B should ping its default gateway successfully."
assert not pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers unsuccessfully."
assert not pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers unsuccessfully."
def test_wireless_wan_blocking_and_unblocking_airspace():
config_path = TEST_ASSETS_ROOT / "configs" / "wireless_wan_wifi_5_80_channel_width_urban.yaml"
with open(config_path, "r") as f:
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
airspace = network.airspace
assert airspace.airspace_environment_type == AirspaceEnvironmentType.URBAN
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")
assert pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers successfully."
assert pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers successfully."
airspace.airspace_environment_type = AirspaceEnvironmentType.BLOCKED
assert not pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers unsuccessfully."
assert not pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers unsuccessfully."
airspace.airspace_environment_type = AirspaceEnvironmentType.URBAN
assert pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers successfully."
assert pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers successfully."

View File

@@ -0,0 +1,138 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from tests.integration_tests.network.test_wireless_router import wireless_wan_network
from tests.integration_tests.system.test_ftp_client_server import ftp_client_and_ftp_server
def test_wireless_link_loading(wireless_wan_network):
client, server, router_1, router_2 = wireless_wan_network
# Configure Router 1 ACLs
router_1.acl.add_rule(action=ACLAction.PERMIT, position=1)
# Configure Router 2 ACLs
router_2.acl.add_rule(action=ACLAction.PERMIT, position=1)
airspace = router_1.airspace
client.software_manager.install(FTPClient)
ftp_client: FTPClient = client.software_manager.software.get("FTPClient")
ftp_client.start()
server.software_manager.install(FTPServer)
ftp_server: FTPServer = server.software_manager.software.get("FTPServer")
ftp_server.start()
client.file_system.create_file(file_name="mixtape", size=10 * 10**6, file_type=FileType.MP3, folder_name="music")
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape.mp3",
dest_folder_name="music",
)
# Reset the physical links between the host nodes and the routers
client.network_interface[1]._connected_link.pre_timestep(1)
server.network_interface[1]._connected_link.pre_timestep(1)
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape1.mp3",
dest_folder_name="music",
)
# Reset the physical links between the host nodes and the routers
client.network_interface[1]._connected_link.pre_timestep(1)
server.network_interface[1]._connected_link.pre_timestep(1)
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape2.mp3",
dest_folder_name="music",
)
# Reset the physical links between the host nodes and the routers
client.network_interface[1]._connected_link.pre_timestep(1)
server.network_interface[1]._connected_link.pre_timestep(1)
assert not ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape3.mp3",
dest_folder_name="music",
)
# Reset the physical links between the host nodes and the routers
client.network_interface[1]._connected_link.pre_timestep(1)
server.network_interface[1]._connected_link.pre_timestep(1)
airspace.reset_bandwidth_load()
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape3.mp3",
dest_folder_name="music",
)
def test_wired_link_loading(ftp_client_and_ftp_server):
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
link = computer.network_interface[1]._connected_link # noqa
assert link.is_up
link.pre_timestep(1)
computer.file_system.create_file(
file_name="mixtape", size=10 * 10**6, file_type=FileType.MP3, folder_name="music"
)
link_load = link.current_load
assert link_load == 0.0
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape.mp3",
dest_folder_name="music",
)
new_link_load = link.current_load
assert new_link_load > link_load
assert not ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape1.mp3",
dest_folder_name="music",
)
link.pre_timestep(2)
link_load = link.current_load
assert link_load == 0.0
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape1.mp3",
dest_folder_name="music",
)
new_link_load = link.current_load
assert new_link_load > link_load

View File

@@ -14,7 +14,7 @@ from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.services.service import Service
class TestBroadcastService(Service):
class BroadcastTestService(Service):
"""A service for sending broadcast and unicast messages over a network."""
def __init__(self, **kwargs):
@@ -41,14 +41,14 @@ class TestBroadcastService(Service):
super().send(payload="broadcast", dest_ip_address=ip_network, dest_port=Port.HTTP, ip_protocol=self.protocol)
class TestBroadcastClient(Application, identifier="TestBroadcastClient"):
class BroadcastTestClient(Application, identifier="BroadcastTestClient"):
"""A client application to receive broadcast and unicast messages."""
payloads_received: List = []
def __init__(self, **kwargs):
# Set default client properties
kwargs["name"] = "TestBroadcastClient"
kwargs["name"] = "BroadcastTestClient"
kwargs["port"] = Port.HTTP
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
@@ -75,8 +75,8 @@ def broadcast_network() -> Network:
start_up_duration=0,
)
client_1.power_on()
client_1.software_manager.install(TestBroadcastClient)
application_1 = client_1.software_manager.software["TestBroadcastClient"]
client_1.software_manager.install(BroadcastTestClient)
application_1 = client_1.software_manager.software["BroadcastTestClient"]
application_1.run()
client_2 = Computer(
@@ -87,8 +87,8 @@ def broadcast_network() -> Network:
start_up_duration=0,
)
client_2.power_on()
client_2.software_manager.install(TestBroadcastClient)
application_2 = client_2.software_manager.software["TestBroadcastClient"]
client_2.software_manager.install(BroadcastTestClient)
application_2 = client_2.software_manager.software["BroadcastTestClient"]
application_2.run()
server_1 = Server(
@@ -100,8 +100,8 @@ def broadcast_network() -> Network:
)
server_1.power_on()
server_1.software_manager.install(TestBroadcastService)
service: TestBroadcastService = server_1.software_manager.software["BroadcastService"]
server_1.software_manager.install(BroadcastTestService)
service: BroadcastTestService = server_1.software_manager.software["BroadcastService"]
service.start()
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
@@ -117,14 +117,14 @@ def broadcast_network() -> Network:
@pytest.fixture(scope="function")
def broadcast_service_and_clients(
broadcast_network,
) -> Tuple[TestBroadcastService, TestBroadcastClient, TestBroadcastClient]:
client_1: TestBroadcastClient = broadcast_network.get_node_by_hostname("client_1").software_manager.software[
"TestBroadcastClient"
) -> Tuple[BroadcastTestService, BroadcastTestClient, BroadcastTestClient]:
client_1: BroadcastTestClient = broadcast_network.get_node_by_hostname("client_1").software_manager.software[
"BroadcastTestClient"
]
client_2: TestBroadcastClient = broadcast_network.get_node_by_hostname("client_2").software_manager.software[
"TestBroadcastClient"
client_2: BroadcastTestClient = broadcast_network.get_node_by_hostname("client_2").software_manager.software[
"BroadcastTestClient"
]
service: TestBroadcastService = broadcast_network.get_node_by_hostname("server_1").software_manager.software[
service: BroadcastTestService = broadcast_network.get_node_by_hostname("server_1").software_manager.software[
"BroadcastService"
]

View File

@@ -21,7 +21,7 @@ def populated_node(application_class) -> Tuple[Application, Computer]:
computer.power_on()
computer.software_manager.install(application_class)
app = computer.software_manager.software.get("TestDummyApplication")
app = computer.software_manager.software.get("DummyApplication")
app.run()
return app, computer
@@ -39,7 +39,7 @@ def test_application_on_offline_node(application_class):
)
computer.software_manager.install(application_class)
app: Application = computer.software_manager.software.get("TestDummyApplication")
app: Application = computer.software_manager.software.get("DummyApplication")
computer.power_off()

View File

@@ -101,6 +101,7 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
actual_result = client_1_nmap.port_scan(
target_ip_address=IPv4Network("192.168.10.0/24"),
target_port=[Port.ARP, Port.HTTP, Port.FTP, Port.DNS, Port.NTP],
)
expected_result = {

View File

@@ -13,7 +13,7 @@ from primaite.simulator.network.hardware.node_operating_state import NodeOperati
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from tests.conftest import TestDummyApplication, TestService
from tests.conftest import DummyApplication, TestService
def test_successful_node_file_system_creation_request(example_network):
@@ -47,14 +47,14 @@ def test_successful_application_requests(example_network):
net = example_network
client_1 = net.get_node_by_hostname("client_1")
client_1.software_manager.install(TestDummyApplication)
client_1.software_manager.software.get("TestDummyApplication").run()
client_1.software_manager.install(DummyApplication)
client_1.software_manager.software.get("DummyApplication").run()
resp_1 = net.apply_request(["node", "client_1", "application", "TestDummyApplication", "scan"])
resp_1 = net.apply_request(["node", "client_1", "application", "DummyApplication", "scan"])
assert resp_1 == RequestResponse(status="success", data={})
resp_2 = net.apply_request(["node", "client_1", "application", "TestDummyApplication", "fix"])
resp_2 = net.apply_request(["node", "client_1", "application", "DummyApplication", "fix"])
assert resp_2 == RequestResponse(status="success", data={})
resp_3 = net.apply_request(["node", "client_1", "application", "TestDummyApplication", "compromise"])
resp_3 = net.apply_request(["node", "client_1", "application", "DummyApplication", "compromise"])
assert resp_3 == RequestResponse(status="success", data={})

View File

@@ -0,0 +1,137 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from uuid import uuid4
import pytest
from primaite import PRIMAITE_CONFIG
from primaite.game.agent.agent_log import AgentLog
from primaite.simulator import LogLevel, SIM_OUTPUT
@pytest.fixture(autouse=True)
def override_dev_mode_temporarily():
"""Temporarily turn off dev mode for this test."""
primaite_dev_mode = PRIMAITE_CONFIG["developer_mode"]["enabled"]
PRIMAITE_CONFIG["developer_mode"]["enabled"] = False
yield # run tests
PRIMAITE_CONFIG["developer_mode"]["enabled"] = primaite_dev_mode
@pytest.fixture(scope="function")
def agentlog() -> AgentLog:
return AgentLog(agent_name="test_agent")
def test_debug_agent_log_level(agentlog, capsys):
"""Test that the debug log level logs debug agent logs and above."""
SIM_OUTPUT.agent_log_level = LogLevel.DEBUG
SIM_OUTPUT.write_agent_log_to_terminal = True
test_string = str(uuid4())
agentlog.debug(msg=test_string)
agentlog.info(msg=test_string)
agentlog.warning(msg=test_string)
agentlog.error(msg=test_string)
agentlog.critical(msg=test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" in captured
assert "INFO" in captured
assert "WARNING" in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_info_agent_log_level(agentlog, capsys):
"""Test that the debug log level logs debug agent logs and above."""
SIM_OUTPUT.agent_log_level = LogLevel.INFO
SIM_OUTPUT.write_agent_log_to_terminal = True
test_string = str(uuid4())
agentlog.debug(msg=test_string)
agentlog.info(msg=test_string)
agentlog.warning(msg=test_string)
agentlog.error(msg=test_string)
agentlog.critical(msg=test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" in captured
assert "WARNING" in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_warning_agent_log_level(agentlog, capsys):
"""Test that the debug log level logs debug agent logs and above."""
SIM_OUTPUT.agent_log_level = LogLevel.WARNING
SIM_OUTPUT.write_agent_log_to_terminal = True
test_string = str(uuid4())
agentlog.debug(msg=test_string)
agentlog.info(msg=test_string)
agentlog.warning(msg=test_string)
agentlog.error(msg=test_string)
agentlog.critical(msg=test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" not in captured
assert "WARNING" in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_error_agent_log_level(agentlog, capsys):
"""Test that the debug log level logs debug agent logs and above."""
SIM_OUTPUT.agent_log_level = LogLevel.ERROR
SIM_OUTPUT.write_agent_log_to_terminal = True
test_string = str(uuid4())
agentlog.debug(msg=test_string)
agentlog.info(msg=test_string)
agentlog.warning(msg=test_string)
agentlog.error(msg=test_string)
agentlog.critical(msg=test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" not in captured
assert "WARNING" not in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_critical_agent_log_level(agentlog, capsys):
"""Test that the debug log level logs debug agent logs and above."""
SIM_OUTPUT.agent_log_level = LogLevel.CRITICAL
SIM_OUTPUT.write_agent_log_to_terminal = True
test_string = str(uuid4())
agentlog.debug(msg=test_string)
agentlog.info(msg=test_string)
agentlog.warning(msg=test_string)
agentlog.error(msg=test_string)
agentlog.critical(msg=test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" not in captured
assert "WARNING" not in captured
assert "ERROR" not in captured
assert "CRITICAL" in captured

View File

@@ -26,7 +26,7 @@ def test_file_scan_request(populated_file_system):
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["file", file.name, "scan"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "scan"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
@@ -37,12 +37,12 @@ def test_file_checkhash_request(populated_file_system):
"""Test that an agent can request a file hash check."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.name, "checkhash"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "checkhash"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["file", file.name, "checkhash"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "checkhash"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
@@ -54,7 +54,7 @@ def test_file_repair_request(populated_file_system):
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["file", file.name, "repair"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "repair"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
@@ -71,7 +71,7 @@ def test_file_restore_request(populated_file_system):
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False
fs.apply_request(request=["file", file.name, "corrupt"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["restore", "file", folder.name, file.name])
@@ -81,7 +81,7 @@ def test_file_restore_request(populated_file_system):
def test_file_corrupt_request(populated_file_system):
"""Test that an agent can request a file corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.name, "corrupt"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "corrupt"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
@@ -90,7 +90,7 @@ def test_deleted_file_cannot_be_interacted_with(populated_file_system):
fs, folder, file = populated_file_system
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
fs.apply_request(request=["file", file.name, "corrupt"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
assert (
fs.get_file(folder_name=folder.name, file_name=file.name).visible_health_status

View File

@@ -39,3 +39,39 @@ def test_folder_delete_request(populated_file_system):
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is None
fs.show(full=True)
def test_folder_exists_request_validator(populated_file_system):
"""Tests that the _FolderExistsValidator works as intended."""
fs, folder, file = populated_file_system
validator = FileSystem._FolderExistsValidator(file_system=fs)
assert validator(request=["test_folder"], context={}) # test_folder exists
assert validator(request=["fake_folder"], context={}) is False # fake_folder does not exist
assert validator.fail_message == "Cannot perform request on folder because it does not exist."
def test_file_exists_request_validator(populated_file_system):
"""Tests that the _FolderExistsValidator works as intended."""
fs, folder, file = populated_file_system
validator = FileSystem._FileExistsValidator(file_system=fs)
assert validator(request=["test_folder", "test_file.txt"], context={}) # test_file.txt exists
assert validator(request=["test_folder", "fake_file.txt"], context={}) is False # fake_file.txt does not exist
assert validator.fail_message == "Cannot perform request on a file that does not exist."
def test_folder_not_deleted_request_validator(populated_file_system):
"""Tests that the _FolderExistsValidator works as intended."""
fs, folder, file = populated_file_system
validator = FileSystem._FolderNotDeletedValidator(file_system=fs)
assert validator(request=["test_folder"], context={}) # test_folder is not deleted
fs.delete_folder(folder_name="test_folder")
assert validator(request=["test_folder"], context={}) is False # test_folder is deleted
assert validator.fail_message == "Cannot perform request on folder because it is deleted."

View File

@@ -166,15 +166,40 @@ def test_deleted_folder_and_its_files_cannot_be_interacted_with(populated_file_s
fs, folder, file = populated_file_system
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
fs.apply_request(request=["file", file.name, "corrupt"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["delete", "folder", folder.name])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
fs.apply_request(request=["file", file.name, "repair"])
fs.apply_request(request=["folder", folder.name, "file", file.name, "repair"])
deleted_folder = fs.deleted_folders.get(folder.uuid)
deleted_file = deleted_folder.deleted_files.get(file.uuid)
assert deleted_file.health_status is not FileSystemItemHealthStatus.GOOD
def test_file_exists_request_validator(populated_file_system):
"""Tests that the _FolderExistsValidator works as intended."""
fs, folder, file = populated_file_system
validator = Folder._FileExistsValidator(folder=folder)
assert validator(request=["test_file.txt"], context={}) # test_file.txt exists
assert validator(request=["fake_file.txt"], context={}) is False # fake_file.txt does not exist
assert validator.fail_message == "Cannot perform request on a file that does not exist."
def test_file_not_deleted_request_validator(populated_file_system):
"""Tests that the _FolderExistsValidator works as intended."""
fs, folder, file = populated_file_system
validator = Folder._FileNotDeletedValidator(folder=folder)
assert validator(request=["test_file.txt"], context={}) # test_file.txt is not deleted
fs.delete_file(folder_name="test_folder", file_name="test_file.txt")
assert validator(request=["fake_file.txt"], context={}) is False # test_file.txt is deleted
assert validator.fail_message == "Cannot perform request on a file that is deleted."

View File

@@ -0,0 +1,34 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import pytest
from primaite.simulator.network.hardware.base import NetworkInterface, Node
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def node() -> Node:
return Computer(hostname="test", ip_address="192.168.1.2", subnet_mask="255.255.255.0")
def test_nic_enabled_validator(node):
"""Test the NetworkInterface enabled validator."""
network_interface = node.network_interface[1]
validator = NetworkInterface._EnabledValidator(network_interface=network_interface)
assert validator(request=[], context={}) is False # not enabled
network_interface.enabled = True
assert validator(request=[], context={}) # enabled
def test_nic_disabled_validator(node):
"""Test the NetworkInterface enabled validator."""
network_interface = node.network_interface[1]
validator = NetworkInterface._DisabledValidator(network_interface=network_interface)
assert validator(request=[], context={}) # not enabled
network_interface.enabled = True
assert validator(request=[], context={}) is False # enabled

View File

@@ -155,3 +155,39 @@ def test_reset_node(node):
assert node.operating_state == NodeOperatingState.BOOTING
assert node.operating_state == NodeOperatingState.ON
def test_node_is_on_validator(node):
"""Test that the node is on validator."""
node.power_on()
for i in range(node.start_up_duration + 1):
node.apply_timestep(i)
validator = Node._NodeIsOnValidator(node=node)
assert validator(request=[], context={})
node.power_off()
for i in range(node.shut_down_duration + 1):
node.apply_timestep(i)
assert validator(request=[], context={}) is False
def test_node_is_off_validator(node):
"""Test that the node is on validator."""
node.power_on()
for i in range(node.start_up_duration + 1):
node.apply_timestep(i)
validator = Node._NodeIsOffValidator(node=node)
assert validator(request=[], context={}) is False
node.power_off()
for i in range(node.shut_down_duration + 1):
node.apply_timestep(i)
assert validator(request=[], context={})

View File

@@ -1 +1,15 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.system.applications.application import Application, ApplicationOperatingState
def test_application_state_validator(application):
"""Test the application state validator."""
validator = Application._StateValidator(application=application, state=ApplicationOperatingState.CLOSED)
assert validator(request=[], context={}) # application is closed
application.run()
assert validator(request=[], context={}) is False # application is running - expecting closed
validator = Application._StateValidator(application=application, state=ApplicationOperatingState.RUNNING)
assert validator(request=[], context={}) # application is running
application.close()
assert validator(request=[], context={}) is False # application is closed - expecting running

View File

@@ -1,5 +1,5 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
@@ -92,3 +92,21 @@ def test_service_fix(service):
assert service.health_state_actual == SoftwareHealthState.FIXING
service.apply_timestep(2)
assert service.health_state_actual == SoftwareHealthState.GOOD
def test_service_state_validator(service):
"""Test the service state validator."""
validator = Service._StateValidator(service=service, state=ServiceOperatingState.STOPPED)
assert validator(request=[], context={}) # service is stopped
service.start()
assert validator(request=[], context={}) is False # service is running - expecting stopped
validator = Service._StateValidator(service=service, state=ServiceOperatingState.RUNNING)
assert validator(request=[], context={}) # service is running
service.pause()
assert validator(request=[], context={}) is False # service is paused - expecting running
validator = Service._StateValidator(service=service, state=ServiceOperatingState.PAUSED)
assert validator(request=[], context={}) # service is paused
service.resume()
assert validator(request=[], context={}) is False # service is running - expecting paused