#2418 Fix wireless router from config

This commit is contained in:
Marek Wolan
2024-03-26 13:21:22 +00:00
committed by Cristian-VM2
parent 4301f3fdba
commit b12dee73ba
4 changed files with 103 additions and 4 deletions

View File

@@ -1418,7 +1418,7 @@ class Router(NetworkNode):
:return: Configured router.
:rtype: Router
"""
router = Router(
router = cls(
hostname=cfg["hostname"],
num_ports=int(cfg.get("num_ports", "5")),
operating_state=NodeOperatingState.ON

View File

@@ -1,10 +1,14 @@
from ipaddress import IPv4Address
from typing import Any, Dict, Union
from pydantic import validate_call
from primaite.simulator.network.airspace import AirSpaceFrequency, IPWirelessNetworkInterface
from primaite.simulator.network.hardware.nodes.network.router import Router, RouterInterface
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.utils.validators import IPV4Address
@@ -209,3 +213,66 @@ class WirelessRouter(Router):
raise NotImplementedError(
"Please use the 'configure_wireless_access_point' and 'configure_router_interface' functions."
)
@classmethod
def from_config(cls, cfg: Dict) -> "WirelessRouter":
"""Generate the wireless router from config.
Schema:
- hostname (str): unique name for this router.
- router_interface (dict): The values should be another dict specifying
- ip_address (str)
- subnet_mask (str)
- wireless_access_point (dict): Dict with
- ip address,
- subnet mask,
- frequency, (string: either WIFI_2_4 or WIFI_5)
- acl (dict): Dict with integers from 1 - max_acl_rules as keys. The key defines the position within the ACL
where the rule will be added (lower number is resolved first). The values should describe valid ACL
Rules as:
- action (str): either PERMIT or DENY
- src_port (str, optional): the named port such as HTTP, HTTPS, or POSTGRES_SERVER
- dst_port (str, optional): the named port such as HTTP, HTTPS, or POSTGRES_SERVER
- protocol (str, optional): the named IP protocol such as ICMP, TCP, or UDP
- src_ip_address (str, optional): IP address octet written in base 10
- dst_ip_address (str, optional): IP address octet written in base 10
:param cfg: Config dictionary
:type cfg: Dict
:return: WirelessRouter instance.
:rtype: WirelessRouter
"""
operating_state = (
NodeOperatingState.ON if not (p := cfg.get("operating_state")) else NodeOperatingState[p.upper()]
)
router = cls(hostname=cfg["hostname"], operating_state=operating_state)
if "router_interface" in cfg:
ip_address = cfg["router_interface"]["ip_address"]
subnet_mask = cfg["router_interface"]["subnet_mask"]
router.configure_router_interface(ip_address=ip_address, subnet_mask=subnet_mask)
if "wireless_access_point" in cfg:
ip_address = cfg["wireless_access_point"]["ip_address"]
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
frequency = AirSpaceFrequency[cfg["wireless_access_point"]["frequency"]]
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
if "acl" in cfg:
for r_num, r_cfg in cfg["acl"].items():
router.acl.add_rule(
action=ACLAction[r_cfg["action"]],
src_port=None if not (p := r_cfg.get("src_port")) else Port[p],
dst_port=None if not (p := r_cfg.get("dst_port")) else Port[p],
protocol=None if not (p := r_cfg.get("protocol")) else IPProtocol[p],
src_ip_address=r_cfg.get("src_ip"),
dst_ip_address=r_cfg.get("dst_ip"),
position=r_num,
)
if "routes" in cfg:
for route in cfg.get("routes"):
router.route_table.add_route(
address=IPv4Address(route.get("address")),
subnet_mask=IPv4Address(route.get("subnet_mask", "255.255.255.0")),
next_hop_ip_address=IPv4Address(route.get("next_hop_ip_address")),
metric=float(route.get("metric", 0)),
)
return router

View File

@@ -687,6 +687,33 @@ simulation:
ip_address: 192.168.10.99
subnet_mask: 255.255.255.0
- ref: router_2
type: wireless_router
hostname: router_2
router_interface:
ip_address: 192.169.1.1
subnet_mask: 255.255.255.0
wireless_access_point:
ip_address: 192.169.1.1
subnet_mask: 255.255.255.0
frequency: WIFI_2_4
acl:
0:
action: PERMIT
src_port: POSTGRES_SERVER
dst_port: POSTGRES_SERVER
1:
action: PERMIT
src_port: DNS
dst_port: DNS
22:
action: PERMIT
src_port: ARP
dst_port: ARP
23:
action: PERMIT
protocol: ICMP
links:
- ref: router_1___switch_1
endpoint_a_ref: router_1

View File

@@ -1,6 +1,8 @@
import pydantic
import pytest
from primaite.simulator.network.hardware.nodes.host.server import Printer
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from tests import TEST_ASSETS_ROOT
from tests.conftest import TempPrimaiteSession
@@ -11,7 +13,6 @@ MISCONFIGURED_PATH = TEST_ASSETS_ROOT / "configs/bad_primaite_session.yaml"
MULTI_AGENT_PATH = TEST_ASSETS_ROOT / "configs/multi_agent_session.yaml"
# @pytest.mark.skip(reason="no way of currently testing this")
class TestPrimaiteSession:
@pytest.mark.parametrize("temp_primaite_session", [[CFG_PATH]], indirect=True)
def test_creating_session(self, temp_primaite_session):
@@ -29,7 +30,11 @@ class TestPrimaiteSession:
assert session.env
assert session.env.game.simulation.network
assert len(session.env.game.simulation.network.nodes) == 11
assert len(session.env.game.simulation.network.nodes) == 12
wireless = session.env.game.simulation.network.get_node_by_hostname("router_2")
assert isinstance(wireless, WirelessRouter)
printer = session.env.game.simulation.network.get_node_by_hostname("HP_LaserJet_Pro_4102fdn_printer")
assert isinstance(printer, Printer)
@pytest.mark.skip(reason="Session is not being maintained and will be removed in the subsequent beta release.")
@pytest.mark.parametrize("temp_primaite_session", [[CFG_PATH]], indirect=True)