#2257: add firewall via config + fix router hop ip address + shuffling around tests
This commit is contained in:
@@ -15,6 +15,7 @@ from primaite.simulator.network.hardware.base import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
@@ -252,6 +253,8 @@ class PrimaiteGame:
|
||||
)
|
||||
elif n_type == "router":
|
||||
new_node = Router.from_config(node_cfg)
|
||||
elif n_type == "firewall":
|
||||
new_node = Firewall.from_config(node_cfg)
|
||||
else:
|
||||
_LOGGER.warning(f"invalid node type {n_type} in config")
|
||||
if "services" in node_cfg:
|
||||
@@ -264,12 +267,12 @@ class PrimaiteGame:
|
||||
new_node.software_manager.install(SERVICE_TYPES_MAPPING[service_type])
|
||||
new_service = new_node.software_manager.software[service_type]
|
||||
game.ref_map_services[service_ref] = new_service.uuid
|
||||
|
||||
# start the service
|
||||
new_service.start()
|
||||
else:
|
||||
_LOGGER.warning(f"service type not found {service_type}")
|
||||
|
||||
# start the service
|
||||
new_service.start()
|
||||
|
||||
# service-dependent options
|
||||
if service_type == "DNSClient":
|
||||
if "options" in service_cfg:
|
||||
|
||||
@@ -12,8 +12,8 @@ class _SimOutput:
|
||||
self._path: Path = (
|
||||
_PRIMAITE_ROOT.parent.parent / "simulation_output" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
)
|
||||
self.save_pcap_logs: bool = True
|
||||
self.save_sys_logs: bool = True
|
||||
self.save_pcap_logs: bool = False
|
||||
self.save_sys_logs: bool = False
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Dict, Final, Optional, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.network.router import (
|
||||
AccessControlList,
|
||||
ACLAction,
|
||||
@@ -491,3 +493,68 @@ class Firewall(Router):
|
||||
"""
|
||||
self.configure_port(DMZ_PORT_ID, ip_address, subnet_mask)
|
||||
self.dmz_port.enable()
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, cfg: dict) -> "Firewall":
|
||||
"""Create a firewall based on a config dict."""
|
||||
new = Firewall(hostname=cfg["hostname"], operating_state=NodeOperatingState.ON)
|
||||
if "ports" in cfg:
|
||||
internal_port = cfg["ports"]["internal_port"]
|
||||
external_port = cfg["ports"]["external_port"]
|
||||
dmz_port = cfg["ports"]["dmz_port"]
|
||||
|
||||
# configure internal port
|
||||
new.configure_internal_port(
|
||||
ip_address=IPV4Address(internal_port.get("ip_address")),
|
||||
subnet_mask=IPV4Address(internal_port.get("subnet_mask")),
|
||||
)
|
||||
|
||||
# configure external port
|
||||
new.configure_external_port(
|
||||
ip_address=IPV4Address(external_port.get("ip_address")),
|
||||
subnet_mask=IPV4Address(external_port.get("subnet_mask")),
|
||||
)
|
||||
|
||||
# configure dmz port
|
||||
new.configure_dmz_port(
|
||||
ip_address=IPV4Address(dmz_port.get("ip_address")), subnet_mask=IPV4Address(dmz_port.get("subnet_mask"))
|
||||
)
|
||||
if "acl" in cfg:
|
||||
# acl rules for internal_inbound_acl
|
||||
if cfg["acl"]["internal_inbound_acl"]:
|
||||
new.internal_inbound_acl._default_config = cfg["acl"]["internal_inbound_acl"]
|
||||
new.internal_inbound_acl._reset_rules_to_default()
|
||||
|
||||
# acl rules for internal_outbound_acl
|
||||
if cfg["acl"]["internal_outbound_acl"]:
|
||||
new.internal_outbound_acl._default_config = cfg["acl"]["internal_outbound_acl"]
|
||||
new.internal_outbound_acl._reset_rules_to_default()
|
||||
|
||||
# acl rules for dmz_inbound_acl
|
||||
if cfg["acl"]["dmz_inbound_acl"]:
|
||||
new.dmz_inbound_acl._default_config = cfg["acl"]["dmz_inbound_acl"]
|
||||
new.dmz_inbound_acl._reset_rules_to_default()
|
||||
|
||||
# acl rules for dmz_outbound_acl
|
||||
if cfg["acl"]["dmz_outbound_acl"]:
|
||||
new.dmz_outbound_acl._default_config = cfg["acl"]["dmz_outbound_acl"]
|
||||
new.dmz_outbound_acl._reset_rules_to_default()
|
||||
|
||||
# acl rules for external_inbound_acl
|
||||
if cfg["acl"]["external_inbound_acl"]:
|
||||
new.external_inbound_acl._default_config = cfg["acl"]["external_inbound_acl"]
|
||||
new.external_inbound_acl._reset_rules_to_default()
|
||||
|
||||
# acl rules for external_outbound_acl
|
||||
if cfg["acl"]["external_outbound_acl"]:
|
||||
new.external_outbound_acl._default_config = cfg["acl"]["external_outbound_acl"]
|
||||
new.external_outbound_acl._reset_rules_to_default()
|
||||
if "routes" in cfg:
|
||||
for route in cfg.get("routes"):
|
||||
new.route_table.add_route(
|
||||
address=IPv4Address(route.get("address")),
|
||||
subnet_mask=IPv4Address(route.get("subnet_mask")),
|
||||
next_hop_ip_address=IPv4Address(route.get("next_hop_ip_address")),
|
||||
metric=float(route.get("metric")),
|
||||
)
|
||||
return new
|
||||
|
||||
@@ -1500,7 +1500,7 @@ class Router(NetworkNode):
|
||||
new.route_table.add_route(
|
||||
address=IPv4Address(route.get("address")),
|
||||
subnet_mask=IPv4Address(route.get("subnet_mask")),
|
||||
next_hop_ip_address=IPv4Address(route.get("subnet_mask")),
|
||||
next_hop_ip_address=IPv4Address(route.get("next_hop_ip_address")),
|
||||
metric=float(route.get("metric")),
|
||||
)
|
||||
return new
|
||||
|
||||
Reference in New Issue
Block a user