Convert office lan adder to a class and make it extendable

This commit is contained in:
Marek Wolan
2024-10-02 13:56:39 +01:00
parent 221e09ba51
commit a838cc6ce1
7 changed files with 248 additions and 193 deletions

View File

@@ -17,6 +17,7 @@ from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent
from primaite.game.agent.scripted_agents.tap001 import TAP001
from primaite.game.science import graph_has_cycle, topological_sort
from primaite.simulator import SIM_OUTPUT
from primaite.simulator.network.creation import NetworkNodeAdder
from primaite.simulator.network.hardware.base import NetworkInterface, NodeOperatingState, UserManager
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode, NIC
@@ -270,6 +271,7 @@ class PrimaiteGame:
nodes_cfg = network_config.get("nodes", [])
links_cfg = network_config.get("links", [])
node_sets_cfg = network_config.get("node_sets", [])
# Set the NMNE capture config
NetworkInterface.nmne_config = NMNEConfig(**network_config.get("nmne_config", {}))
@@ -505,6 +507,10 @@ class PrimaiteGame:
new_node.start_up_duration = int(node_cfg.get("start_up_duration", 3))
new_node.shut_down_duration = int(node_cfg.get("shut_down_duration", 3))
# 1.1 Create Node Sets
for node_set_cfg in node_sets_cfg:
NetworkNodeAdder.from_config(node_set_cfg, network=net)
# 2. create links between nodes
for link_cfg in links_cfg:
node_a = net.get_node_by_hostname(link_cfg["endpoint_a_hostname"])

View File

@@ -1,9 +1,8 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
import copy
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from typing import Any, ClassVar, Dict, List
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, ConfigDict, Field, validate_call
@@ -52,36 +51,17 @@ class AirSpaceFrequency(BaseModel):
data_rate_bps: float
"""How much data can be transmitted on this frequency per second."""
_registry: ClassVar[Dict[str, AirSpaceFrequency]] = {}
_default_frequency_set: Dict[str, AirSpaceFrequency] = {
freq.name: freq
for freq in (
AirSpaceFrequency(name="WIFI_2_4", frequency_hz=2.4e9, data_rate_bps=100_000_000.0),
AirSpaceFrequency(name="WIFI_5", frequency_hz=5e9, data_rate_bps=500_000_000.0),
)
}
"""Frequency configuration that is automatically used for any new airspace."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.name in self._registry:
raise RuntimeError(f"Frequency {self.name} is already registered. Cannot register it again.")
self._registry[self.name] = self
def register_default_frequency(freq_name: str, freq_hz: float, data_rate_bps: float) -> None:
"""Add to the default frequency configuration. This is intended as a plugin hook.
If your plugin makes use of bespoke frequencies for wireless communication, you should make a call to this method
wherever you define components that rely on the bespoke frequencies. That way, as soon as your components are
imported, this function automatically updates the default frequency set.
This should also be run before instances of AirSpace are created.
:param freq_name: The frequency name. If this clashes with an existing frequency name, it will be overwritten.
:type freq_name: str
:param freq_hz: The frequency itself, measured in Hertz.
:type freq_hz: float
:param data_rate_bps: The transmission capacity over this frequency, in bits per second.
:type data_rate_bps: float
"""
_default_frequency_set.update(
{freq_name: AirSpaceFrequency(name=freq_name, frequency_hz=freq_hz, data_rate_bps=data_rate_bps)}
)
FREQ_WIFI_2_4 = AirSpaceFrequency(name="WIFI_2_4", frequency_hz=2.4e9, data_rate_bps=100_000_000.0)
FREQ_WIFI_5 = AirSpaceFrequency(name="WIFI_5", frequency_hz=5e9, data_rate_bps=500_000_000.0)
class AirSpace(BaseModel):
@@ -96,7 +76,7 @@ class AirSpace(BaseModel):
wireless_interfaces: Dict[str, WirelessNetworkInterface] = Field(default_factory=lambda: {})
wireless_interfaces_by_frequency: Dict[int, List[WirelessNetworkInterface]] = Field(default_factory=lambda: {})
bandwidth_load: Dict[int, float] = Field(default_factory=lambda: {})
frequencies: Dict[str, AirSpaceFrequency] = Field(default_factory=lambda: copy.deepcopy(_default_frequency_set))
frequencies: Dict[str, AirSpaceFrequency] = AirSpaceFrequency._registry
@validate_call
def get_frequency_max_capacity_mbps(self, freq_name: str) -> float:
@@ -228,9 +208,9 @@ class AirSpace(BaseModel):
"""
if wireless_interface.mac_address not in self.wireless_interfaces:
self.wireless_interfaces[wireless_interface.mac_address] = wireless_interface
if wireless_interface.frequency not in self.wireless_interfaces_by_frequency:
self.wireless_interfaces_by_frequency[wireless_interface.frequency] = []
self.wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
if wireless_interface.frequency.frequency_hz not in self.wireless_interfaces_by_frequency:
self.wireless_interfaces_by_frequency[wireless_interface.frequency.frequency_hz] = []
self.wireless_interfaces_by_frequency[wireless_interface.frequency.frequency_hz].append(wireless_interface)
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
"""
@@ -240,7 +220,7 @@ class AirSpace(BaseModel):
"""
if wireless_interface.mac_address in self.wireless_interfaces:
self.wireless_interfaces.pop(wireless_interface.mac_address)
self.wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
self.wireless_interfaces_by_frequency[wireless_interface.frequency.frequency_hz].remove(wireless_interface)
def clear(self):
"""
@@ -316,7 +296,7 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
"""
airspace: AirSpace
frequency: str = "WIFI_2_4"
frequency: AirSpaceFrequency = FREQ_WIFI_2_4
def enable(self):
"""Attempt to enable the network interface."""

View File

@@ -1,6 +1,9 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from abc import ABC, abstractmethod
from ipaddress import IPv4Address
from typing import Optional
from typing import Any, ClassVar, Dict, Type
from pydantic import BaseModel
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@@ -10,6 +13,204 @@ from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
class NetworkNodeAdder(BaseModel):
"""
Base class for adding a set of related nodes to a network in a standardised way.
Child classes should define a ConfigSchema nested class that subclasses NetworkNodeAdder.ConfigSchema and a __call__
method which performs the node addition to the network.
Here is a template that users can use to define custom node adders:
```
class YourNodeAdder(NetworkNodeAdder, identifier="your_name"):
class ConfigSchema(NetworkNodeAdder.ConfigSchema):
property_1 : str
property_2 : int
@classmetho
def __call__()
```
"""
class ConfigSchema(BaseModel, ABC):
"""
Base schema for node adders.
Child classes of NetworkNodeAdder must define a schema which inherits from this schema. The identifier is used
by the from_config method to select the correct node adder at runtime.
"""
identifier: str
"""Uniquely identifies the node adder class to use for adding nodes to network."""
_registry: ClassVar[Dict[str, Type["NetworkNodeAdder"]]] = {}
def __init_subclass__(cls, identifier: str, **kwargs: Any) -> None:
"""
Register a network node adder class.
:param identifier: Unique name for the node adder to use for matching against primaite config entries.
:type identifier: str
:raises ValueError: When attempting to register a name that is already reserved.
"""
super().__init_subclass__(**kwargs)
if identifier in cls._registry:
raise ValueError(f"Duplicate node adder {identifier}")
cls._registry[identifier] = cls
@classmethod
@abstractmethod
def add_nodes_to_net(cls, config: ConfigSchema, network: Network) -> None:
"""
Add nodes to the network.
Abstract method that must be overwritten by child classes. Use the config definition to create nodes and add
them to the network that is passed in.
:param config: Config object that defines how to create and add nodes to the network
:type config: ConfigSchema
:param network: PrimAITE network object to which to add nodes.
:type network: Network
"""
pass
@classmethod
def from_config(cls, config: Dict, network: Network) -> None:
"""
Accept a config, find the relevant node adder class, and call it to add nodes to the network.
Child classes do not need to define this method.
:param config: Configuration object for the child adder class
:type config: Dict
:param network: The Network object to which to add nodes
:type network: Network
"""
if config["type"] not in cls._registry:
raise ValueError(f"Invalid node adder type {config['type']}")
adder_class = cls._registry[config["type"]]
adder_class.add_nodes_to_net(config=adder_class.ConfigSchema(**config), network=network)
class OfficeLANAdder(NetworkNodeAdder, identifier="office_lan"):
"""Creates an office LAN."""
class ConfigSchema(NetworkNodeAdder.ConfigSchema):
"""Configuration schema for OfficeLANAdder."""
lan_name: str
"""Name of lan used for generating hostnames for new nodes."""
subnet_base: int
"""Used as the third octet of IP addresses for nodes in the network."""
pcs_ip_block_start: int
"""Starting point for the fourth octet of IP addresses of nodes in the network."""
num_pcs: int
"""The number of hosts to generate."""
include_router: bool = True
"""Whether to include a router in the new office LAN."""
bandwidth: int = 100
"""Data bandwidth to the LAN measured in Mbps."""
@classmethod
def add_nodes_to_net(cls, config: ConfigSchema, network: Network) -> None:
"""
Add an office lan to the network according to the config definition.
This method creates a number of hosts and enough switches such that all hosts can be connected to a switch.
Optionally, a router is added to connect the switches together. All the nodes and networking devices are added
to the provided network.
:param config: Configuration object specifying office LAN parameters
:type config: OfficeLANAdder.ConfigSchema
:param network: The PrimAITE network to which to add the office LAN.
:type network: Network
:raises ValueError: upon invalid configuration
"""
# Calculate the required number of switches
num_of_switches = num_of_switches_required(num_nodes=config.num_pcs)
effective_network_interface = 23 # One port less for router connection
if config.pcs_ip_block_start <= num_of_switches:
raise ValueError(
f"pcs_ip_block_start must be greater than the number of required switches {num_of_switches}"
)
# Create a core switch if more than one edge switch is needed
if num_of_switches > 1:
core_switch = Switch(hostname=f"switch_core_{config.lan_name}", start_up_duration=0)
core_switch.power_on()
network.add_node(core_switch)
core_switch_port = 1
# Initialise the default gateway to None
default_gateway = None
# Optionally include a router in the LAN
if config.include_router:
default_gateway = IPv4Address(f"192.168.{config.subnet_base}.1")
router = Router(hostname=f"router_{config.lan_name}", start_up_duration=0)
router.power_on()
router.acl.add_rule(
action=ACLAction.PERMIT, src_port=PORT_LOOKUP["ARP"], dst_port=PORT_LOOKUP["ARP"], position=22
)
router.acl.add_rule(action=ACLAction.PERMIT, protocol=PROTOCOL_LOOKUP["ICMP"], position=23)
network.add_node(router)
router.configure_port(port=1, ip_address=default_gateway, subnet_mask="255.255.255.0")
router.enable_port(1)
# Initialise the first edge switch and connect to the router or core switch
switch_port = 0
switch_n = 1
switch = Switch(hostname=f"switch_edge_{switch_n}_{config.lan_name}", start_up_duration=0)
switch.power_on()
network.add_node(switch)
if num_of_switches > 1:
network.connect(
core_switch.network_interface[core_switch_port],
switch.network_interface[24],
bandwidth=config.bandwidth,
)
else:
network.connect(router.network_interface[1], switch.network_interface[24], bandwidth=config.bandwidth)
# Add PCs to the LAN and connect them to switches
for i in range(1, config.num_pcs + 1):
# Add a new edge switch if the current one is full
if switch_port == effective_network_interface:
switch_n += 1
switch_port = 0
switch = Switch(hostname=f"switch_edge_{switch_n}_{config.lan_name}", start_up_duration=0)
switch.power_on()
network.add_node(switch)
# Connect the new switch to the router or core switch
if num_of_switches > 1:
core_switch_port += 1
network.connect(
core_switch.network_interface[core_switch_port],
switch.network_interface[24],
bandwidth=config.bandwidth,
)
else:
network.connect(
router.network_interface[1], switch.network_interface[24], bandwidth=config.bandwidth
)
# Create and add a PC to the network
pc = Computer(
hostname=f"pc_{i}_{config.lan_name}",
ip_address=f"192.168.{config.subnet_base}.{i+config.pcs_ip_block_start-1}",
subnet_mask="255.255.255.0",
default_gateway=default_gateway,
start_up_duration=0,
)
pc.power_on()
network.add_node(pc)
# Connect the PC to the switch
switch_port += 1
network.connect(switch.network_interface[switch_port], pc.network_interface[1], bandwidth=config.bandwidth)
switch.network_interface[switch_port].enable()
def num_of_switches_required(num_nodes: int, max_network_interface: int = 24) -> int:
"""
Calculate the minimum number of network switches required to connect a given number of nodes.
@@ -42,115 +243,3 @@ def num_of_switches_required(num_nodes: int, max_network_interface: int = 24) ->
# Return the total number of switches required
return full_switches + (1 if extra_pcs > 0 else 0)
def create_office_lan(
lan_name: str,
subnet_base: int,
pcs_ip_block_start: int,
num_pcs: int,
network: Optional[Network] = None,
include_router: bool = True,
bandwidth: int = 100,
) -> Network:
"""
Creates a 2-Tier or 3-Tier office local area network (LAN).
The LAN is configured with a specified number of personal computers (PCs), optionally including a router,
and multiple edge switches to connect them. A core switch is added only if more than one edge switch is required.
The network topology involves edge switches connected either directly to the router in a 2-Tier setup or
to a core switch in a 3-Tier setup. If a router is included, it is connected to the core switch (if present)
and configured with basic access control list (ACL) rules. PCs are distributed across the edge switches.
:param str lan_name: The name to be assigned to the LAN.
:param int subnet_base: The subnet base number to be used in the IP addresses.
:param int pcs_ip_block_start: The starting block for assigning IP addresses to PCs.
:param int num_pcs: The number of PCs to be added to the LAN.
:param Optional[Network] network: The network to which the LAN components will be added. If None, a new network is
created.
:param bool include_router: Flag to determine if a router should be included in the LAN. Defaults to True.
:return: The network object with the LAN components added.
:raises ValueError: If pcs_ip_block_start is less than or equal to the number of required switches.
"""
# Initialise the network if not provided
if not network:
network = Network()
# Calculate the required number of switches
num_of_switches = num_of_switches_required(num_nodes=num_pcs)
effective_network_interface = 23 # One port less for router connection
if pcs_ip_block_start <= num_of_switches:
raise ValueError(f"pcs_ip_block_start must be greater than the number of required switches {num_of_switches}")
# Create a core switch if more than one edge switch is needed
if num_of_switches > 1:
core_switch = Switch(hostname=f"switch_core_{lan_name}", start_up_duration=0)
core_switch.power_on()
network.add_node(core_switch)
core_switch_port = 1
# Initialise the default gateway to None
default_gateway = None
# Optionally include a router in the LAN
if include_router:
default_gateway = IPv4Address(f"192.168.{subnet_base}.1")
router = Router(hostname=f"router_{lan_name}", start_up_duration=0)
router.power_on()
router.acl.add_rule(
action=ACLAction.PERMIT, src_port=PORT_LOOKUP["ARP"], dst_port=PORT_LOOKUP["ARP"], position=22
)
router.acl.add_rule(action=ACLAction.PERMIT, protocol=PROTOCOL_LOOKUP["ICMP"], position=23)
network.add_node(router)
router.configure_port(port=1, ip_address=default_gateway, subnet_mask="255.255.255.0")
router.enable_port(1)
# Initialise the first edge switch and connect to the router or core switch
switch_port = 0
switch_n = 1
switch = Switch(hostname=f"switch_edge_{switch_n}_{lan_name}", start_up_duration=0)
switch.power_on()
network.add_node(switch)
if num_of_switches > 1:
network.connect(
core_switch.network_interface[core_switch_port], switch.network_interface[24], bandwidth=bandwidth
)
else:
network.connect(router.network_interface[1], switch.network_interface[24], bandwidth=bandwidth)
# Add PCs to the LAN and connect them to switches
for i in range(1, num_pcs + 1):
# Add a new edge switch if the current one is full
if switch_port == effective_network_interface:
switch_n += 1
switch_port = 0
switch = Switch(hostname=f"switch_edge_{switch_n}_{lan_name}", start_up_duration=0)
switch.power_on()
network.add_node(switch)
# Connect the new switch to the router or core switch
if num_of_switches > 1:
core_switch_port += 1
network.connect(
core_switch.network_interface[core_switch_port], switch.network_interface[24], bandwidth=bandwidth
)
else:
network.connect(router.network_interface[1], switch.network_interface[24], bandwidth=bandwidth)
# Create and add a PC to the network
pc = Computer(
hostname=f"pc_{i}_{lan_name}",
ip_address=f"192.168.{subnet_base}.{i+pcs_ip_block_start-1}",
subnet_mask="255.255.255.0",
default_gateway=default_gateway,
start_up_duration=0,
)
pc.power_on()
network.add_node(pc)
# Connect the PC to the switch
switch_port += 1
network.connect(switch.network_interface[switch_port], pc.network_interface[1], bandwidth=bandwidth)
switch.network_interface[switch_port].enable()
return network

View File

@@ -1539,6 +1539,25 @@ class Node(SimComponent):
SYSTEM_SOFTWARE: ClassVar[Dict[str, Type[Software]]] = {}
"Base system software that must be preinstalled."
_registry: ClassVar[Dict[str, Type["Node"]]] = {}
"""Registry of application types. Automatically populated when subclasses are defined."""
def __init_subclass__(cls, identifier: str = "default", **kwargs: Any) -> None:
"""
Register a node type.
:param identifier: Uniquely specifies an node class by name. Used for finding items by config.
:type identifier: str
:raises ValueError: When attempting to register an node with a name that is already allocated.
"""
if identifier == "default":
return
identifier = identifier.lower()
super().__init_subclass__(**kwargs)
if identifier in cls._registry:
raise ValueError(f"Tried to define new node {identifier}, but this name is already reserved.")
cls._registry[identifier] = cls
def __init__(self, **kwargs):
"""
Initialize the Node with various components and managers.

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Any, ClassVar, Dict, Optional, Type
from typing import Any, ClassVar, Dict, Optional
from primaite import getLogger
from primaite.simulator.network.hardware.base import (
@@ -325,30 +325,10 @@ class HostNode(Node):
network_interface: Dict[int, NIC] = {}
"The NICs on the node by port id."
_registry: ClassVar[Dict[str, Type["HostNode"]]] = {}
"""Registry of application types. Automatically populated when subclasses are defined."""
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask))
def __init_subclass__(cls, identifier: str = "default", **kwargs: Any) -> None:
"""
Register a hostnode type.
:param identifier: Uniquely specifies an hostnode class by name. Used for finding items by config.
:type identifier: str
:raises ValueError: When attempting to register an hostnode with a name that is already allocated.
"""
if identifier == "default":
return
# Enforce lowercase registry entries because it makes comparisons everywhere else much easier.
identifier = identifier.lower()
super().__init_subclass__(**kwargs)
if identifier in cls._registry:
raise ValueError(f"Tried to define new hostnode {identifier}, but this name is already reserved.")
cls._registry[identifier] = cls
@property
def nmap(self) -> Optional[NMAP]:
"""

View File

@@ -1,6 +1,6 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from abc import abstractmethod
from typing import Any, ClassVar, Dict, Optional, Type
from typing import Optional
from primaite.simulator.network.hardware.base import NetworkInterface, Node
from primaite.simulator.network.transmission.data_link_layer import Frame
@@ -16,25 +16,6 @@ class NetworkNode(Node):
provide functionality for receiving and processing frames received on their network interfaces.
"""
_registry: ClassVar[Dict[str, Type["NetworkNode"]]] = {}
"""Registry of application types. Automatically populated when subclasses are defined."""
def __init_subclass__(cls, identifier: str = "default", **kwargs: Any) -> None:
"""
Register a networknode type.
:param identifier: Uniquely specifies an networknode class by name. Used for finding items by config.
:type identifier: str
:raises ValueError: When attempting to register an networknode with a name that is already allocated.
"""
if identifier == "default":
return
identifier = identifier.lower()
super().__init_subclass__(**kwargs)
if identifier in cls._registry:
raise ValueError(f"Tried to define new networknode {identifier}, but this name is already reserved.")
cls._registry[identifier] = cls
@abstractmethod
def receive_frame(self, frame: Frame, from_network_interface: NetworkInterface):
"""

View File

@@ -4,7 +4,7 @@ from typing import Any, Dict, Optional, Union
from pydantic import validate_call
from primaite.simulator.network.airspace import AirSpace, IPWirelessNetworkInterface
from primaite.simulator.network.airspace import AirSpace, AirSpaceFrequency, FREQ_WIFI_2_4, IPWirelessNetworkInterface
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
from primaite.simulator.network.transmission.data_link_layer import Frame
@@ -153,7 +153,7 @@ class WirelessRouter(Router):
self,
ip_address: IPV4Address,
subnet_mask: IPV4Address,
frequency: Optional[str] = "WIFI_2_4",
frequency: Optional[AirSpaceFrequency] = FREQ_WIFI_2_4,
):
"""
Configures a wireless access point (WAP).
@@ -171,7 +171,7 @@ class WirelessRouter(Router):
communication. Default is "WIFI_2_4".
"""
if not frequency:
frequency = "WIFI_2_4"
frequency = FREQ_WIFI_2_4
self.sys_log.info("Configuring wireless access point")
self.wireless_access_point.disable() # Temporarily disable the WAP for reconfiguration
@@ -264,7 +264,7 @@ class WirelessRouter(Router):
if "wireless_access_point" in cfg:
ip_address = cfg["wireless_access_point"]["ip_address"]
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
frequency = cfg["wireless_access_point"]["frequency"]
frequency = AirSpaceFrequency._registry[cfg["wireless_access_point"]["frequency"]]
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
if "acl" in cfg: