Merged PR 458: Carry over airspace hotfixes from internal

## Summary
Carried over hit fixes from internal that backtracked on the complex channel width stuff for now and focussed on getting a stable data rate baked in for each frequency.  Implemented overriding of frequency max capacities on the airspace. updated documentation to reflect the changes in airspace.py.

## Test process
- Original tests still work
- Tested reading the frequency capacity overrides from config file
- Tested that setting the frequency override to 0.0 blocks the channel

## Checklist
- [X] PR is linked to a **work item**
- [X] **acceptance criteria** of linked ticket are met
- [X] performed **self-review** of the code
- [X] written **tests** for any new functionality added with this PR
- [X] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [X] updated the **change log**
- [X] ran **pre-commit** checks for code style
- [X] attended to any **TO-DOs** left in the code

Related work items: #2745
This commit is contained in:
Christopher McCarthy
2024-07-12 10:17:25 +00:00
13 changed files with 151 additions and 704 deletions

View File

@@ -17,7 +17,7 @@ from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent
from primaite.game.agent.scripted_agents.tap001 import TAP001
from primaite.game.science import graph_has_cycle, topological_sort
from primaite.simulator import SIM_OUTPUT
from primaite.simulator.network.airspace import AirspaceEnvironmentType
from primaite.simulator.network.airspace import AirSpaceFrequency
from primaite.simulator.network.hardware.base import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
@@ -256,10 +256,11 @@ class PrimaiteGame:
simulation_config = cfg.get("simulation", {})
network_config = simulation_config.get("network", {})
airspace_cfg = network_config.get("airspace", {})
airspace_environment_type_str = airspace_cfg.get("airspace_environment_type", "urban")
frequency_max_capacity_mbps_cfg = airspace_cfg.get("frequency_max_capacity_mbps", {})
airspace_environment_type: AirspaceEnvironmentType = AirspaceEnvironmentType(airspace_environment_type_str)
net.airspace.airspace_environment_type = airspace_environment_type
frequency_max_capacity_mbps_cfg = {AirSpaceFrequency[k]: v for k, v in frequency_max_capacity_mbps_cfg.items()}
net.airspace.frequency_max_capacity_mbps_ = frequency_max_capacity_mbps_cfg
nodes_cfg = network_config.get("nodes", [])
links_cfg = network_config.get("links", [])

View File

@@ -3,11 +3,10 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List
import numpy as np
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, computed_field, Field, model_validator
from pydantic import BaseModel, Field
from primaite import getLogger
from primaite.simulator.network.hardware.base import Layer3Interface, NetworkInterface, WiredNetworkInterface
@@ -58,228 +57,34 @@ class AirSpaceFrequency(Enum):
return f"WiFi {hertz_str}"
return "Unknown Frequency"
class ChannelWidth(Enum):
"""
Enumeration representing the available channel widths in MHz for wireless communications.
This enum facilitates standardising and validating channel width configurations.
Attributes:
WIDTH_20_MHZ (int): Represents a channel width of 20 MHz, commonly used for basic
Wi-Fi connectivity with standard range and interference resistance.
WIDTH_40_MHZ (int): Represents a channel width of 40 MHz, offering higher data
throughput at the expense of potentially increased interference.
WIDTH_80_MHZ (int): Represents a channel width of 80 MHz, typically used in modern
Wi-Fi setups for high data rate applications but with higher susceptibility to interference.
WIDTH_160_MHZ (int): Represents a channel width of 160 MHz, used for ultra-high-speed
network applications, providing maximum data throughput with significant
requirements on the spectral environment to minimize interference.
"""
WIDTH_20_MHZ = 20
"""
Represents a channel width of 20 MHz, commonly used for basic Wi-Fi connectivity with standard range and
interference resistance
"""
WIDTH_40_MHZ = 40
"""
Represents a channel width of 40 MHz, offering higher data throughput at the expense of potentially increased
interference.
"""
WIDTH_80_MHZ = 80
"""
Represents a channel width of 80 MHz, typically used in modern Wi-Fi setups for high data rate applications but
with higher susceptibility to interference.
"""
WIDTH_160_MHZ = 160
"""
Represents a channel width of 160 MHz, used for ultra-high-speed network applications, providing maximum data
throughput with significant requirements on the spectral environment to minimize interference.
"""
def __str__(self) -> str:
@property
def maximum_data_rate_bps(self) -> float:
"""
Returns a string representation of the channel width.
Retrieves the maximum data transmission rate in bits per second (bps) for the frequency.
:return: String in the format of "<value> MHz" indicating the channel width.
The maximum rates are predefined for known frequencies:
- For WIFI_2_4, it returns 100,000,000 bps (100 Mbps).
- For WIFI_5, it returns 500,000,000 bps (500 Mbps).
:return: The maximum data rate in bits per second. If the frequency is not recognized, returns 0.0.
"""
return f"{self.value} MHz"
AirSpaceKeyType = Tuple[AirSpaceFrequency, ChannelWidth]
class AirspaceEnvironmentType(Enum):
"""Enum representing different types of airspace environments which affect wireless communication signals."""
RURAL = "rural"
"""
A rural environment offers clear channel conditions due to low population density and minimal electronic device
presence.
"""
OUTDOOR = "outdoor"
"""
Outdoor environments like parks or fields have minimal electronic interference.
"""
SUBURBAN = "suburban"
"""
Suburban environments strike a balance with fewer electronic interferences than urban but more than rural.
"""
OFFICE = "office"
"""
Office environments have moderate interference from numerous electronic devices and overlapping networks.
"""
URBAN = "urban"
"""
Urban environments are characterized by tall buildings and a high density of electronic devices, leading to
significant interference.
"""
INDUSTRIAL = "industrial"
"""
Industrial areas face high interference from heavy machinery and numerous electronic devices.
"""
TRANSPORT = "transport"
"""
Environments such as subways and buses where metal structures and high mobility create complex interference
patterns.
"""
DENSE_URBAN = "dense_urban"
"""
Dense urban areas like city centers have the highest level of signal interference due to the very high density of
buildings and devices.
"""
JAMMING_ZONE = "jamming_zone"
"""
A jamming zone environment where signals are actively interfered with, typically through the use of signal jammers
or scrambling devices. This represents the environment with the highest level of interference.
"""
BLOCKED = "blocked"
"""
A jamming zone environment with total levels of interference. Airspace is completely blocked.
"""
if self == AirSpaceFrequency.WIFI_2_4:
return 100_000_000.0 # 100 Megabits per second
if self == AirSpaceFrequency.WIFI_5:
return 500_000_000.0 # 500 Megabits per second
return 0.0
@property
def snr_impact(self) -> int:
def maximum_data_rate_mbps(self) -> float:
"""
Returns the SNR impact associated with the environment.
Retrieves the maximum data transmission rate in megabits per second (Mbps).
:return: SNR impact in dB.
This is derived by converting the maximum data rate from bits per second, as defined
in `maximum_data_rate_bps`, to megabits per second.
:return: The maximum data rate in megabits per second.
"""
impacts = {
AirspaceEnvironmentType.RURAL: 0,
AirspaceEnvironmentType.OUTDOOR: 1,
AirspaceEnvironmentType.SUBURBAN: -5,
AirspaceEnvironmentType.OFFICE: -7,
AirspaceEnvironmentType.URBAN: -10,
AirspaceEnvironmentType.INDUSTRIAL: -15,
AirspaceEnvironmentType.TRANSPORT: -12,
AirspaceEnvironmentType.DENSE_URBAN: -20,
AirspaceEnvironmentType.JAMMING_ZONE: -40,
AirspaceEnvironmentType.BLOCKED: -100,
}
return impacts[self]
def __str__(self) -> str:
return f"{self.value.title()} Environment (SNR Impact: {self.snr_impact})"
def estimate_snr(
frequency: AirSpaceFrequency, environment_type: AirspaceEnvironmentType, channel_width: ChannelWidth
) -> float:
"""
Estimate the Signal-to-Noise Ratio (SNR) based on the communication frequency, environment, and channel width.
This function considers both the base SNR value dependent on the frequency and the impact of environmental
factors and channel width on the SNR.
The SNR is adjusted by reducing it for wider channels, reflecting the increased noise floor from a broader
frequency range.
:param frequency: The operating frequency as defined by AirSpaceFrequency enum, influencing the base SNR. Higher
frequencies like 5 GHz generally start with a higher base SNR due to less noise.
:param environment_type: The type of environment from AirspaceEnvironmentType enum, which adjusts the SNR based on
expected environmental noise and interference levels.
:param channel_width: The channel width from ChannelWidth enum, where wider channels (80 MHz and 160 MHz) decrease
the SNR slightly due to an increased noise floor.
:return: Estimated SNR in dB, calculated as the base SNR modified by environmental and channel width impacts.
"""
base_snr = 40 if frequency == AirSpaceFrequency.WIFI_5 else 30
snr_impact = environment_type.snr_impact
# Adjust SNR impact based on channel width
if channel_width == ChannelWidth.WIDTH_80_MHZ or channel_width == ChannelWidth.WIDTH_160_MHZ:
snr_impact -= 3 # Assume wider channels have slightly lower SNR due to increased noise floor
return base_snr + snr_impact
def calculate_total_channel_capacity(
channel_width: ChannelWidth, frequency: AirSpaceFrequency, environment_type: AirspaceEnvironmentType
) -> float:
"""
Calculate the total theoretical data rate for the channel using the Shannon-Hartley theorem.
This function determines the channel's capacity by considering the bandwidth (derived from channel width),
and the signal-to-noise ratio (SNR) adjusted by frequency and environmental conditions.
The Shannon-Hartley theorem states that channel capacity C (in bits per second) can be calculated as:
``C = B * log2(1 + SNR)`` where B is the bandwidth in Hertz and SNR is the signal-to-noise ratio.
:param channel_width: The width of the channel as defined by ChannelWidth enum, converted to Hz for calculation.
:param frequency: The operating frequency as defined by AirSpaceFrequency enum, influencing the base SNR and part
of the SNR estimation.
:param environment_type: The type of environment as defined by AirspaceEnvironmentType enum, used in SNR estimation.
:return: Theoretical total data rate in Mbps for the entire channel.
"""
bandwidth_hz = channel_width.value * 1_000_000 # Convert MHz to Hz
snr_db = estimate_snr(frequency, environment_type, channel_width)
snr_linear = 10 ** (snr_db / 10)
total_capacity_bps = bandwidth_hz * np.log2(1 + snr_linear)
total_capacity_mbps = total_capacity_bps / 1_000_000
return total_capacity_mbps
def calculate_individual_device_rate(
channel_width: ChannelWidth,
frequency: AirSpaceFrequency,
environment_type: AirspaceEnvironmentType,
device_count: int,
) -> float:
"""
Calculate the theoretical data rate available to each individual device on the channel.
This function first calculates the total channel capacity and then divides this capacity by the number
of active devices to estimate each device's share of the bandwidth. This reflects the practical limitation
that multiple devices must share the same channel resources.
:param channel_width: The channel width as defined by ChannelWidth enum, used in total capacity calculation.
:param frequency: The operating frequency as defined by AirSpaceFrequency enum, used in total capacity calculation.
:param environment_type: The environment type as defined by AirspaceEnvironmentType enum, impacting SNR and
capacity.
:param device_count: The number of devices sharing the channel. If zero, returns zero to avoid division by zero.
:return: Theoretical data rate in Mbps available per device, based on shared channel capacity.
"""
total_capacity_mbps = calculate_total_channel_capacity(channel_width, frequency, environment_type)
if device_count == 0:
return 0 # Avoid division by zero
individual_device_rate_mbps = total_capacity_mbps / device_count
return individual_device_rate_mbps
return self.maximum_data_rate_bps / 1_000_000.0
class AirSpace(BaseModel):
@@ -287,105 +92,65 @@ class AirSpace(BaseModel):
Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission.
This class provides functionalities to manage a collection of wireless network interfaces, each associated with
specific frequencies and channel widths. It includes methods to calculate and manage bandwidth loads, add and
remove wireless interfaces, and handle data transmission across these interfaces.
specific frequencies. It includes methods to add and remove wireless interfaces, and handle data transmission
across these interfaces.
"""
airspace_environment_type_: AirspaceEnvironmentType = AirspaceEnvironmentType.URBAN
wireless_interfaces: Dict[str, WirelessNetworkInterface] = Field(default_factory=lambda: {})
wireless_interfaces_by_frequency_channel_width: Dict[AirSpaceKeyType, List[WirelessNetworkInterface]] = Field(
wireless_interfaces_by_frequency: Dict[AirSpaceFrequency, List[WirelessNetworkInterface]] = Field(
default_factory=lambda: {}
)
bandwidth_load: Dict[AirSpaceKeyType, float] = Field(default_factory=lambda: {})
frequency_channel_width_max_capacity_mbps: Dict[AirSpaceKeyType, float] = Field(default_factory=lambda: {})
bandwidth_load: Dict[AirSpaceFrequency, float] = Field(default_factory=lambda: {})
frequency_max_capacity_mbps_: Dict[AirSpaceFrequency, float] = Field(default_factory=lambda: {})
def model_post_init(self, __context: Any) -> None:
def get_frequency_max_capacity_mbps(self, frequency: AirSpaceFrequency) -> float:
"""
Initialize the airspace metadata after instantiation.
Retrieves the maximum data transmission capacity for a specified frequency.
This method is called to set up initial configurations like the maximum capacity of each channel width and
frequency based on the current environment setting.
This method checks a dictionary holding custom maximum capacities. If the frequency is found, it returns the
custom set maximum capacity. If the frequency is not found in the dictionary, it defaults to the standard
maximum data rate associated with that frequency.
:param __context: Contextual data or settings, typically used for further initializations beyond
the basic constructor.
:param frequency: The frequency for which the maximum capacity is queried.
:return: The maximum capacity in Mbps for the specified frequency.
"""
self._set_frequency_channel_width_max_capacity_mbps()
if frequency in self.frequency_max_capacity_mbps_:
return self.frequency_max_capacity_mbps_[frequency]
return frequency.maximum_data_rate_mbps
def _set_frequency_channel_width_max_capacity_mbps(self):
def set_frequency_max_capacity_mbps(self, cfg: Dict[AirSpaceFrequency, float]):
"""
Private method to compute and set the maximum channel capacity in Mbps for each frequency and channel width.
Sets custom maximum data transmission capacities for multiple frequencies.
Based on the airspace environment type, this method calculates the maximum possible data transmission
capacity for each combination of frequency and channel width available and stores these values.
These capacities are critical for managing and limiting bandwidth load during operations.
:param cfg: A dictionary mapping frequencies to their new maximum capacities in Mbps.
"""
print(
f"Rebuilding the frequency channel width maximum capacity dictionary based on "
f"airspace environment type {self.airspace_environment_type_}"
)
for frequency in AirSpaceFrequency:
for channel_width in ChannelWidth:
max_capacity = calculate_total_channel_capacity(
frequency=frequency, channel_width=channel_width, environment_type=self.airspace_environment_type
)
self.frequency_channel_width_max_capacity_mbps[frequency, channel_width] = max_capacity
@computed_field
@property
def airspace_environment_type(self) -> AirspaceEnvironmentType:
"""
Gets the current environment type of the airspace.
:return: The AirspaceEnvironmentType representing the current environment type.
"""
return self.airspace_environment_type_
@airspace_environment_type.setter
def airspace_environment_type(self, value: AirspaceEnvironmentType) -> None:
"""
Sets a new environment type for the airspace and updates related configurations.
Changing the environment type triggers a re-calculation of the maximum channel capacities and
adjustments to the current setup of wireless interfaces to ensure they are aligned with the
new environment settings.
:param value: The new environment type as an AirspaceEnvironmentType.
"""
if value != self.airspace_environment_type_:
print(f"Setting airspace_environment_type to {value}")
self.airspace_environment_type_ = value
self._set_frequency_channel_width_max_capacity_mbps()
wireless_interface_keys = list(self.wireless_interfaces.keys())
for wireless_interface_key in wireless_interface_keys:
wireless_interface = self.wireless_interfaces[wireless_interface_key]
self.remove_wireless_interface(wireless_interface)
self.add_wireless_interface(wireless_interface)
self.frequency_max_capacity_mbps_ = cfg
for freq, mbps in cfg.items():
print(f"Overriding {freq} max capacity as {mbps:.3f} mbps")
def show_bandwidth_load(self, markdown: bool = False):
"""
Prints a table of the current bandwidth load for each frequency and channel width combination on the airspace.
Prints a table of the current bandwidth load for each frequency on the airspace.
This method prints a tabulated view showing the utilisation of available bandwidth capacities for all configured
frequency and channel width pairings. The table includes the current capacity usage as a percentage of the
maximum capacity, alongside the absolute maximum capacity values in Mbps.
This method prints a tabulated view showing the utilisation of available bandwidth capacities for all
frequencies. The table includes the current capacity usage as a percentage of the maximum capacity, alongside
the absolute maximum capacity values in Mbps.
:param markdown: Flag indicating if output should be in markdown format.
"""
headers = ["Frequency", "Channel Width", "Current Capacity (%)", "Maximum Capacity (Mbit)"]
headers = ["Frequency", "Current Capacity (%)", "Maximum Capacity (Mbit)"]
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "Airspace Frequency Channel Loads"
for key, load in self.bandwidth_load.items():
frequency, channel_width = key
maximum_capacity = self.frequency_channel_width_max_capacity_mbps[key]
load_percent = load / maximum_capacity
for frequency, load in self.bandwidth_load.items():
maximum_capacity = self.get_frequency_max_capacity_mbps(frequency)
load_percent = load / maximum_capacity if maximum_capacity > 0 else 0.0
if load_percent > 1.0:
load_percent = 1.0
table.add_row(
[format_hertz(frequency.value), str(channel_width), f"{load_percent:.0%}", f"{maximum_capacity:.3f}"]
)
table.add_row([format_hertz(frequency.value), f"{load_percent:.0%}", f"{maximum_capacity:.3f}"])
print(table)
def show_wireless_interfaces(self, markdown: bool = False):
@@ -400,7 +165,6 @@ class AirSpace(BaseModel):
"IP Address",
"Subnet Mask",
"Frequency",
"Channel Width",
"Speed (Mbps)",
"Status",
]
@@ -408,7 +172,7 @@ class AirSpace(BaseModel):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"Devices on Air Space - {self.airspace_environment_type}"
table.title = "Devices on Air Space"
for interface in self.wireless_interfaces.values():
status = "Enabled" if interface.enabled else "Disabled"
@@ -419,7 +183,6 @@ class AirSpace(BaseModel):
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
format_hertz(interface.frequency.value),
str(interface.channel_width),
f"{interface.speed:.3f}",
status,
]
@@ -431,8 +194,8 @@ class AirSpace(BaseModel):
Prints a summary of the current state of the airspace, including both wireless interfaces and bandwidth loads.
This method is a convenient wrapper that calls two separate methods to display detailed tables: one for
wireless interfaces and another for bandwidth load across all frequencies and channel widths managed within the
airspace. It provides a holistic view of the operational status and performance metrics of the airspace.
wireless interfaces and another for bandwidth load across all frequencies managed within the airspace. It
provides a holistic view of the operational status and performance metrics of the airspace.
:param markdown: Flag indicating if output should be in markdown format.
"""
@@ -447,15 +210,9 @@ class AirSpace(BaseModel):
"""
if wireless_interface.mac_address not in self.wireless_interfaces:
self.wireless_interfaces[wireless_interface.mac_address] = wireless_interface
if wireless_interface.airspace_key not in self.wireless_interfaces_by_frequency_channel_width:
self.wireless_interfaces_by_frequency_channel_width[wireless_interface.airspace_key] = []
self.wireless_interfaces_by_frequency_channel_width[wireless_interface.airspace_key].append(
wireless_interface
)
speed = calculate_total_channel_capacity(
wireless_interface.channel_width, wireless_interface.frequency, self.airspace_environment_type
)
wireless_interface.set_speed(speed)
if wireless_interface.frequency not in self.wireless_interfaces_by_frequency:
self.wireless_interfaces_by_frequency[wireless_interface.frequency] = []
self.wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
@@ -465,9 +222,7 @@ class AirSpace(BaseModel):
"""
if wireless_interface.mac_address in self.wireless_interfaces:
self.wireless_interfaces.pop(wireless_interface.mac_address)
self.wireless_interfaces_by_frequency_channel_width[wireless_interface.airspace_key].remove(
wireless_interface
)
self.wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
def clear(self):
"""
@@ -477,7 +232,7 @@ class AirSpace(BaseModel):
occur until new interfaces are added again.
"""
self.wireless_interfaces.clear()
self.wireless_interfaces_by_frequency_channel_width.clear()
self.wireless_interfaces_by_frequency.clear()
def reset_bandwidth_load(self):
"""
@@ -500,12 +255,11 @@ class AirSpace(BaseModel):
relevant frequency and its current bandwidth load.
:return: True if the frame can be transmitted within the bandwidth limit, False if it would exceed the limit.
"""
if sender_network_interface.airspace_key not in self.bandwidth_load:
self.bandwidth_load[sender_network_interface.airspace_key] = 0.0
return (
self.bandwidth_load[sender_network_interface.airspace_key] + frame.size_Mbits
<= self.frequency_channel_width_max_capacity_mbps[sender_network_interface.airspace_key]
)
if sender_network_interface.frequency not in self.bandwidth_load:
self.bandwidth_load[sender_network_interface.frequency] = 0.0
return self.bandwidth_load[
sender_network_interface.frequency
] + frame.size_Mbits <= self.get_frequency_max_capacity_mbps(sender_network_interface.frequency)
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
"""
@@ -517,10 +271,8 @@ class AirSpace(BaseModel):
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
excluded from the list of receivers to prevent it from receiving its own transmission.
"""
self.bandwidth_load[sender_network_interface.airspace_key] += frame.size_Mbits
for wireless_interface in self.wireless_interfaces_by_frequency_channel_width.get(
sender_network_interface.airspace_key, []
):
self.bandwidth_load[sender_network_interface.frequency] += frame.size_Mbits
for wireless_interface in self.wireless_interfaces_by_frequency.get(sender_network_interface.frequency, []):
if wireless_interface != sender_network_interface and wireless_interface.enabled:
wireless_interface.receive_frame(frame)
@@ -546,135 +298,7 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
"""
airspace: AirSpace
frequency_: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4
channel_width_: ChannelWidth = ChannelWidth.WIDTH_40_MHZ
@model_validator(mode="after") # noqa
def validate_channel_width_for_2_4_ghz(self) -> "WirelessNetworkInterface":
"""
Validate the wireless interface's channel width settings after model changes.
This method serves as a model validator to ensure that the channel width settings for the 2.4 GHz frequency
comply with accepted standards (either 20 MHz or 40 MHz). It's triggered after model instantiation.
Ensures that the channel width is appropriate for the current frequency setting, particularly checking
and adjusting the settings for the 2.4 GHz frequency band to not exceed 40 MHz. This is crucial for
avoiding interference and ensuring optimal performance in densely populated wireless environments.
"""
self._check_wifi_24_channel_width()
return self
def model_post_init(self, __context: Any) -> None:
"""Initialise the model after its creation, setting the speed based on the calculated channel capacity."""
speed = calculate_total_channel_capacity(
channel_width=self.channel_width,
frequency=self.frequency,
environment_type=self.airspace.airspace_environment_type,
)
self.set_speed(speed)
def _check_wifi_24_channel_width(self) -> None:
"""
Ensures that the channel width for 2.4 GHz frequency does not exceed 40 MHz.
This method checks the current frequency and channel width settings and adjusts the channel width
to 40 MHz if the frequency is set to 2.4 GHz and the channel width exceeds 40 MHz. This is done to
comply with typical Wi-Fi standards for 2.4 GHz frequencies, which commonly support up to 40 MHz.
Logs a SysLog warning if the channel width had to be adjusted, logging this change either to the connected
node's system log or the global logger, depending on whether the interface is connected to a node.
"""
if self.frequency_ == AirSpaceFrequency.WIFI_2_4 and self.channel_width_.value > 40:
self.channel_width_ = ChannelWidth.WIDTH_40_MHZ
msg = (
f"Channel width must be either 20 Mhz or 40 Mhz when using {AirSpaceFrequency.WIFI_2_4}. "
f"Overriding value to use {ChannelWidth.WIDTH_40_MHZ}."
)
if self._connected_node:
self._connected_node.sys_log.warning(f"Wireless Interface {self.port_num}: {msg}")
else:
_LOGGER.warning(msg)
@computed_field
@property
def frequency(self) -> AirSpaceFrequency:
"""
Get the current operating frequency of the wireless interface.
:return: The current frequency as an AirSpaceFrequency enum value.
"""
return self.frequency_
@frequency.setter
def frequency(self, value: AirSpaceFrequency) -> None:
"""
Set the operating frequency of the wireless interface and update the network configuration.
This setter updates the frequency of the wireless interface if the new value differs from the current setting.
It handles the update by first removing the interface from the current airspace management to avoid conflicts,
setting the new frequency, ensuring the channel width remains compliant, and then re-adding the interface
to the airspace with the new settings.
:param value: The new frequency to set, as an AirSpaceFrequency enum value.
"""
if value != self.frequency_:
self.airspace.remove_wireless_interface(self)
self.frequency_ = value
self._check_wifi_24_channel_width()
self.airspace.add_wireless_interface(self)
@computed_field
@property
def channel_width(self) -> ChannelWidth:
"""
Get the current channel width setting of the wireless interface.
:return: The current channel width as a ChannelWidth enum value.
"""
return self.channel_width_
@channel_width.setter
def channel_width(self, value: ChannelWidth) -> None:
"""
Set the channel width of the wireless interface and manage configuration compliance.
Updates the channel width of the wireless interface. If the new channel width is different from the existing
one, it first removes the interface from the airspace to prevent configuration conflicts, sets the new channel
width, checks and adjusts it if necessary (especially for 2.4 GHz frequency to comply with typical standards),
and then re-registers the interface in the airspace with updated settings.
:param value: The new channel width to set, as a ChannelWidth enum value.
"""
if value != self.channel_width_:
self.airspace.remove_wireless_interface(self)
self.channel_width_ = value
self._check_wifi_24_channel_width()
self.airspace.add_wireless_interface(self)
@property
def airspace_key(self) -> tuple:
"""
The airspace bandwidth/channel identifier for the wireless interface based on its frequency and channel width.
:return: A tuple containing the frequency and channel width, serving as a bandwidth/channel key.
"""
return self.frequency_, self.channel_width_
def set_speed(self, speed: float):
"""
Sets the network interface speed to the specified value and logs this action.
This method updates the speed attribute of the network interface to the given value, reflecting
the theoretical maximum data rate that the interface can support based on the current settings.
It logs the new speed to the system log of the connected node if available.
:param speed: The speed in Mbps to be set for the network interface.
"""
self.speed = speed
if self._connected_node:
self._connected_node.sys_log.info(
f"Wireless Interface {self.port_num}: Setting theoretical maximum data rate to {speed:.3f} Mbps."
)
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4
def enable(self):
"""Attempt to enable the network interface."""

View File

@@ -4,7 +4,7 @@ from typing import Any, Dict, Optional, Union
from pydantic import validate_call
from primaite.simulator.network.airspace import AirSpace, AirSpaceFrequency, ChannelWidth, IPWirelessNetworkInterface
from primaite.simulator.network.airspace import AirSpace, AirSpaceFrequency, IPWirelessNetworkInterface
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
from primaite.simulator.network.transmission.data_link_layer import Frame
@@ -154,7 +154,6 @@ class WirelessRouter(Router):
ip_address: IPV4Address,
subnet_mask: IPV4Address,
frequency: Optional[AirSpaceFrequency] = AirSpaceFrequency.WIFI_2_4,
channel_width: Optional[ChannelWidth] = ChannelWidth.WIDTH_40_MHZ,
):
"""
Configures a wireless access point (WAP).
@@ -173,8 +172,6 @@ class WirelessRouter(Router):
"""
if not frequency:
frequency = AirSpaceFrequency.WIFI_2_4
if not channel_width:
channel_width = ChannelWidth.WIDTH_40_MHZ
self.sys_log.info("Configuring wireless access point")
self.wireless_access_point.disable() # Temporarily disable the WAP for reconfiguration
@@ -185,7 +182,6 @@ class WirelessRouter(Router):
network_interface.subnet_mask = subnet_mask
self.wireless_access_point.frequency = frequency # Set operating frequency
self.wireless_access_point.channel_width = channel_width
self.wireless_access_point.enable() # Re-enable the WAP with new settings
self.sys_log.info(f"Configured WAP {network_interface}")
@@ -269,12 +265,7 @@ class WirelessRouter(Router):
ip_address = cfg["wireless_access_point"]["ip_address"]
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
frequency = AirSpaceFrequency[cfg["wireless_access_point"]["frequency"]]
channel_width = cfg["wireless_access_point"].get("channel_width")
if channel_width:
channel_width = ChannelWidth(channel_width)
router.configure_wireless_access_point(
ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency, channel_width=channel_width
)
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
if "acl" in cfg:
for r_num, r_cfg in cfg["acl"].items():