Merged PR 318: #2418 Add Printer and Wireless router to config parser

## Summary
Add ability to parse printers and wireless routers from yaml config.

## Test process
Existing unit tests pass. Added a printer to one of the test yamls. Adding a wireless router as well now.

## Checklist
- [x] PR is linked to a **work item**
- [x] **acceptance criteria** of linked ticket are met
- [x] performed **self-review** of the code
- [ ] written **tests** for any new functionality added with this PR
- [ ] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [ ] updated the **change log**
- [x] ran **pre-commit** checks for code style
- [x] attended to any **TO-DOs** left in the code

Related work items: #2418
This commit is contained in:
Marek Wolan
2024-03-27 13:35:33 +00:00
9 changed files with 353 additions and 6 deletions

View File

@@ -15,10 +15,11 @@ from primaite.game.science import graph_has_cycle, topological_sort
from primaite.simulator.network.hardware.base import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.host.server import Printer, Server
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.nmne import set_nmne_config
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.sim_container import Simulation
@@ -273,8 +274,18 @@ class PrimaiteGame:
new_node = Router.from_config(node_cfg)
elif n_type == "firewall":
new_node = Firewall.from_config(node_cfg)
elif n_type == "wireless_router":
new_node = WirelessRouter.from_config(node_cfg)
elif n_type == "printer":
new_node = Printer(
hostname=node_cfg["hostname"],
ip_address=node_cfg["ip_address"],
subnet_mask=node_cfg["subnet_mask"],
)
else:
_LOGGER.warning(f"invalid node type {n_type} in config")
msg = f"invalid node type {n_type} in config"
_LOGGER.error(msg)
raise ValueError(msg)
if "services" in node_cfg:
for service_cfg in node_cfg["services"]:
new_service = None

View File

@@ -8,6 +8,7 @@ from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.network.hardware.base import Link, Node, WiredNetworkInterface
from primaite.simulator.network.hardware.nodes.host.server import Printer
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.services.service import Service
@@ -110,6 +111,16 @@ class Network(SimComponent):
"""The Firewalls in the Network."""
return [node for node in self.nodes.values() if node.__class__.__name__ == "Firewall"]
@property
def printer_nodes(self) -> List[Node]:
"""The printers on the network."""
return [node for node in self.nodes.values() if isinstance(node, Printer)]
@property
def wireless_router_nodes(self) -> List[Node]:
"""The Routers in the Network."""
return [node for node in self.nodes.values() if node.__class__.__name__ == "WirelessRouter"]
def show(self, nodes: bool = True, ip_addresses: bool = True, links: bool = True, markdown: bool = False):
"""
Print tables describing the Network.
@@ -128,6 +139,8 @@ class Network(SimComponent):
"Switch": self.switch_nodes,
"Server": self.server_nodes,
"Computer": self.computer_nodes,
"Printer": self.printer_nodes,
"Wireless Router": self.wireless_router_nodes,
}
if nodes:
table = PrettyTable(["Node", "Type", "Operating State"])

View File

@@ -28,3 +28,9 @@ class Server(HostNode):
* Applications:
* Web Browser
"""
class Printer(HostNode):
"""Printer? I don't even know her!."""
# TODO: Implement printer-specific behaviour

View File

@@ -1418,7 +1418,7 @@ class Router(NetworkNode):
:return: Configured router.
:rtype: Router
"""
router = Router(
router = cls(
hostname=cfg["hostname"],
num_ports=int(cfg.get("num_ports", "5")),
operating_state=NodeOperatingState.ON
@@ -1441,6 +1441,8 @@ class Router(NetworkNode):
protocol=None if not (p := r_cfg.get("protocol")) else IPProtocol[p],
src_ip_address=r_cfg.get("src_ip"),
dst_ip_address=r_cfg.get("dst_ip"),
src_wildcard_mask=r_cfg.get("src_wildcard_mask"),
dst_wildcard_mask=r_cfg.get("dst_wildcard_mask"),
position=r_num,
)
if "routes" in cfg:

View File

@@ -1,10 +1,14 @@
from ipaddress import IPv4Address
from typing import Any, Dict, Union
from pydantic import validate_call
from primaite.simulator.network.airspace import AirSpaceFrequency, IPWirelessNetworkInterface
from primaite.simulator.network.hardware.nodes.network.router import Router, RouterInterface
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.utils.validators import IPV4Address
@@ -209,3 +213,68 @@ class WirelessRouter(Router):
raise NotImplementedError(
"Please use the 'configure_wireless_access_point' and 'configure_router_interface' functions."
)
@classmethod
def from_config(cls, cfg: Dict) -> "WirelessRouter":
"""Generate the wireless router from config.
Schema:
- hostname (str): unique name for this router.
- router_interface (dict): The values should be another dict specifying
- ip_address (str)
- subnet_mask (str)
- wireless_access_point (dict): Dict with
- ip address,
- subnet mask,
- frequency, (string: either WIFI_2_4 or WIFI_5)
- acl (dict): Dict with integers from 1 - max_acl_rules as keys. The key defines the position within the ACL
where the rule will be added (lower number is resolved first). The values should describe valid ACL
Rules as:
- action (str): either PERMIT or DENY
- src_port (str, optional): the named port such as HTTP, HTTPS, or POSTGRES_SERVER
- dst_port (str, optional): the named port such as HTTP, HTTPS, or POSTGRES_SERVER
- protocol (str, optional): the named IP protocol such as ICMP, TCP, or UDP
- src_ip_address (str, optional): IP address octet written in base 10
- dst_ip_address (str, optional): IP address octet written in base 10
:param cfg: Config dictionary
:type cfg: Dict
:return: WirelessRouter instance.
:rtype: WirelessRouter
"""
operating_state = (
NodeOperatingState.ON if not (p := cfg.get("operating_state")) else NodeOperatingState[p.upper()]
)
router = cls(hostname=cfg["hostname"], operating_state=operating_state)
if "router_interface" in cfg:
ip_address = cfg["router_interface"]["ip_address"]
subnet_mask = cfg["router_interface"]["subnet_mask"]
router.configure_router_interface(ip_address=ip_address, subnet_mask=subnet_mask)
if "wireless_access_point" in cfg:
ip_address = cfg["wireless_access_point"]["ip_address"]
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
frequency = AirSpaceFrequency[cfg["wireless_access_point"]["frequency"]]
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
if "acl" in cfg:
for r_num, r_cfg in cfg["acl"].items():
router.acl.add_rule(
action=ACLAction[r_cfg["action"]],
src_port=None if not (p := r_cfg.get("src_port")) else Port[p],
dst_port=None if not (p := r_cfg.get("dst_port")) else Port[p],
protocol=None if not (p := r_cfg.get("protocol")) else IPProtocol[p],
src_ip_address=r_cfg.get("src_ip"),
dst_ip_address=r_cfg.get("dst_ip"),
src_wildcard_mask=r_cfg.get("src_wildcard_mask"),
dst_wildcard_mask=r_cfg.get("dst_wildcard_mask"),
position=r_num,
)
if "routes" in cfg:
for route in cfg.get("routes"):
router.route_table.add_route(
address=IPv4Address(route.get("address")),
subnet_mask=IPv4Address(route.get("subnet_mask", "255.255.255.0")),
next_hop_ip_address=IPv4Address(route.get("next_hop_ip_address")),
metric=float(route.get("metric", 0)),
)
return router