Merge remote-tracking branch 'origin/dev' into feature/2319-database-admin

This commit is contained in:
Marek Wolan
2024-03-03 11:24:09 +00:00
12 changed files with 305 additions and 44 deletions

View File

@@ -223,8 +223,12 @@ class PrimaiteGame:
sim = game.simulation
net = sim.network
nodes_cfg = cfg["simulation"]["network"]["nodes"]
links_cfg = cfg["simulation"]["network"]["links"]
simulation_config = cfg.get("simulation", {})
network_config = simulation_config.get("network", {})
nodes_cfg = network_config.get("nodes", [])
links_cfg = network_config.get("links", [])
for node_cfg in nodes_cfg:
node_ref = node_cfg["ref"]
n_type = node_cfg["type"]
@@ -391,7 +395,7 @@ class PrimaiteGame:
game.ref_map_links[link_cfg["ref"]] = new_link.uuid
# 3. create agents
agents_cfg = cfg["agents"]
agents_cfg = cfg.get("agents", [])
for agent_cfg in agents_cfg:
agent_ref = agent_cfg["ref"] # noqa: F841
@@ -447,6 +451,6 @@ class PrimaiteGame:
game.agents[agent_cfg["ref"]] = new_agent
# Set the NMNE capture config
set_nmne_config(cfg["simulation"]["network"].get("nmne_config", {}))
set_nmne_config(network_config.get("nmne_config", {}))
return game

View File

@@ -8,10 +8,6 @@ from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.network.hardware.base import Link, Node, WiredNetworkInterface
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.services.service import Service
@@ -85,24 +81,29 @@ class Network(SimComponent):
self.links[link_id].apply_timestep(timestep=timestep)
@property
def routers(self) -> List[Router]:
def router_nodes(self) -> List[Node]:
"""The Routers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Router)]
return [node for node in self.nodes.values() if node.__class__.__name__ == "Router"]
@property
def switches(self) -> List[Switch]:
def switch_nodes(self) -> List[Node]:
"""The Switches in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Switch)]
return [node for node in self.nodes.values() if node.__class__.__name__ == "Switch"]
@property
def computers(self) -> List[Computer]:
def computer_nodes(self) -> List[Node]:
"""The Computers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Computer) and not isinstance(node, Server)]
return [node for node in self.nodes.values() if node.__class__.__name__ == "Computer"]
@property
def servers(self) -> List[Server]:
def server_nodes(self) -> List[Node]:
"""The Servers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Server)]
return [node for node in self.nodes.values() if node.__class__.__name__ == "Server"]
@property
def firewall_nodes(self) -> List[Node]:
"""The Firewalls in the Network."""
return [node for node in self.nodes.values() if node.__class__.__name__ == "Firewall"]
def show(self, nodes: bool = True, ip_addresses: bool = True, links: bool = True, markdown: bool = False):
"""
@@ -117,10 +118,11 @@ class Network(SimComponent):
:param markdown: Use Markdown style in table output. Defaults to False.
"""
nodes_type_map = {
"Router": self.routers,
"Switch": self.switches,
"Server": self.servers,
"Computer": self.computers,
"Router": self.router_nodes,
"Firewall": self.firewall_nodes,
"Switch": self.switch_nodes,
"Server": self.server_nodes,
"Computer": self.computer_nodes,
}
if nodes:
table = PrettyTable(["Node", "Type", "Operating State"])
@@ -143,7 +145,10 @@ class Network(SimComponent):
for node in nodes:
for i, port in node.network_interface.items():
if hasattr(port, "ip_address"):
table.add_row([node.hostname, i, port.ip_address, port.subnet_mask, node.default_gateway])
port_str = port.port_name if port.port_name else port.port_num
table.add_row(
[node.hostname, port_str, port.ip_address, port.subnet_mask, node.default_gateway]
)
print(table)
if links:

View File

@@ -500,7 +500,7 @@ class Firewall(Router):
if "ports" in cfg:
internal_port = cfg["ports"]["internal_port"]
external_port = cfg["ports"]["external_port"]
dmz_port = cfg["ports"]["dmz_port"]
dmz_port = cfg["ports"].get("dmz_port")
# configure internal port
firewall.configure_internal_port(
@@ -514,11 +514,12 @@ class Firewall(Router):
subnet_mask=IPV4Address(external_port.get("subnet_mask", "255.255.255.0")),
)
# configure dmz port
firewall.configure_dmz_port(
ip_address=IPV4Address(dmz_port.get("ip_address")),
subnet_mask=IPV4Address(dmz_port.get("subnet_mask", "255.255.255.0")),
)
# configure dmz port if not none
if dmz_port is not None:
firewall.configure_dmz_port(
ip_address=IPV4Address(dmz_port.get("ip_address")),
subnet_mask=IPV4Address(dmz_port.get("subnet_mask", "255.255.255.0")),
)
if "acl" in cfg:
# acl rules for internal_inbound_acl
if cfg["acl"]["internal_inbound_acl"]:
@@ -573,7 +574,7 @@ class Firewall(Router):
)
# acl rules for external_inbound_acl
if cfg["acl"]["external_inbound_acl"]:
if cfg["acl"].get("external_inbound_acl"):
for r_num, r_cfg in cfg["acl"]["external_inbound_acl"].items():
firewall.external_inbound_acl.add_rule(
action=ACLAction[r_cfg["action"]],
@@ -586,7 +587,7 @@ class Firewall(Router):
)
# acl rules for external_outbound_acl
if cfg["acl"]["external_outbound_acl"]:
if cfg["acl"].get("external_outbound_acl"):
for r_num, r_cfg in cfg["acl"]["external_outbound_acl"].items():
firewall.external_outbound_acl.add_rule(
action=ACLAction[r_cfg["action"]],