Turn AirSpaceFrequency to a schema instead of a dict for validation

This commit is contained in:
Marek Wolan
2024-09-27 15:06:19 +01:00
parent 521580ea12
commit 221e09ba51

View File

@@ -6,7 +6,7 @@ from abc import ABC, abstractmethod
from typing import Any, Dict, List
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field, validate_call
from pydantic import BaseModel, ConfigDict, Field, validate_call
from primaite import getLogger
from primaite.simulator.network.hardware.base import Layer3Interface, NetworkInterface, WiredNetworkInterface
@@ -41,9 +41,24 @@ def format_hertz(hertz: float, format_terahertz: bool = False, decimals: int = 3
return format_str.format(hertz) + " Hz"
_default_frequency_set: Dict[str, Dict] = {
"WIFI_2_4": {"frequency": 2.4e9, "data_rate_bps": 100_000_000.0},
"WIFI_5": {"frequency": 5e9, "data_rate_bps": 500_000_000.0},
class AirSpaceFrequency(BaseModel):
"""Data transfer object for defining properties of an airspace frequency."""
model_config = ConfigDict(extra="forbid")
name: str
"""Alias for frequency."""
frequency_hz: int
"""This acts as the primary key. If two names are mapped to the same frequency, they will share a bandwidth."""
data_rate_bps: float
"""How much data can be transmitted on this frequency per second."""
_default_frequency_set: Dict[str, AirSpaceFrequency] = {
freq.name: freq
for freq in (
AirSpaceFrequency(name="WIFI_2_4", frequency_hz=2.4e9, data_rate_bps=100_000_000.0),
AirSpaceFrequency(name="WIFI_5", frequency_hz=5e9, data_rate_bps=500_000_000.0),
)
}
"""Frequency configuration that is automatically used for any new airspace."""
@@ -64,7 +79,9 @@ def register_default_frequency(freq_name: str, freq_hz: float, data_rate_bps: fl
:param data_rate_bps: The transmission capacity over this frequency, in bits per second.
:type data_rate_bps: float
"""
_default_frequency_set.update({freq_name: {"frequency": freq_hz, "data_rate_bps": data_rate_bps}})
_default_frequency_set.update(
{freq_name: AirSpaceFrequency(name=freq_name, frequency_hz=freq_hz, data_rate_bps=data_rate_bps)}
)
class AirSpace(BaseModel):
@@ -79,7 +96,7 @@ class AirSpace(BaseModel):
wireless_interfaces: Dict[str, WirelessNetworkInterface] = Field(default_factory=lambda: {})
wireless_interfaces_by_frequency: Dict[int, List[WirelessNetworkInterface]] = Field(default_factory=lambda: {})
bandwidth_load: Dict[int, float] = Field(default_factory=lambda: {})
frequencies: Dict[str, Dict] = Field(default_factory=lambda: copy.deepcopy(_default_frequency_set))
frequencies: Dict[str, AirSpaceFrequency] = Field(default_factory=lambda: copy.deepcopy(_default_frequency_set))
@validate_call
def get_frequency_max_capacity_mbps(self, freq_name: str) -> float:
@@ -90,7 +107,7 @@ class AirSpace(BaseModel):
:return: The maximum capacity in Mbps for the specified frequency.
"""
if freq_name in self.frequencies:
return self.frequencies[freq_name]["data_rate_bps"] / (1024.0 * 1024.0)
return self.frequencies[freq_name].data_rate_bps / (1024.0 * 1024.0)
return 0.0
def set_frequency_max_capacity_mbps(self, cfg: Dict[int, float]) -> None:
@@ -100,7 +117,7 @@ class AirSpace(BaseModel):
:param cfg: A dictionary mapping frequencies to their new maximum capacities in Mbps.
"""
for freq, mbps in cfg.items():
self.frequencies[freq]["data_rate_bps"] = mbps * 1024 * 1024
self.frequencies[freq].data_rate_bps = mbps * 1024 * 1024
print(f"Overriding {freq} max capacity as {mbps:.3f} mbps")
def register_frequency(self, freq_name: str, freq_hz: float, data_rate_bps: float) -> None:
@@ -117,10 +134,12 @@ class AirSpace(BaseModel):
if freq_name in self.frequencies:
_LOGGER.info(
f"Overwriting Air space frequency {freq_name}. "
f"Previous data rate: {self.frequencies[freq_name]['data_rate_bps']}. "
f"Previous data rate: {self.frequencies[freq_name].data_rate_bps}. "
f"Current data rate: {data_rate_bps}."
)
self.frequencies.update({freq_name: {"frequency": freq_hz, "data_rate_bps": data_rate_bps}})
self.frequencies.update(
{freq_name: AirSpaceFrequency(name=freq_name, frequency_hz=freq_hz, data_rate_bps=data_rate_bps)}
)
def show_bandwidth_load(self, markdown: bool = False):
"""
@@ -145,7 +164,7 @@ class AirSpace(BaseModel):
load_percent = 1.0
table.add_row(
[
format_hertz(self.frequencies[frequency]["frequency"]),
format_hertz(self.frequencies[frequency].frequency_hz),
f"{load_percent:.0%}",
f"{maximum_capacity:.3f}",
]
@@ -181,7 +200,7 @@ class AirSpace(BaseModel):
interface.mac_address,
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
format_hertz(self.frequencies[interface.frequency]["frequency"]),
format_hertz(self.frequencies[interface.frequency].frequency_hz),
f"{interface.speed:.3f}",
status,
]