#2887 - Fixed Node unit test failures

This commit is contained in:
Charlie Crane
2025-01-29 11:55:10 +00:00
parent f85aace31b
commit 51f1c91e15
22 changed files with 427 additions and 212 deletions

View File

@@ -269,7 +269,8 @@ class PrimaiteGame:
new_node = None
if n_type in Node._registry:
# simplify down Node creation:
if n_type == "wireless_router":
node_cfg["airspace"] = net.airspace
new_node = Node._registry[n_type].from_config(config=node_cfg)
else:
msg = f"invalid node type {n_type} in config"

View File

@@ -178,7 +178,7 @@ class AirSpace(BaseModel):
status = "Enabled" if interface.enabled else "Disabled"
table.add_row(
[
interface._connected_node.hostname, # noqa
interface._connected_node.config.hostname, # noqa
interface.mac_address,
interface.ip_address if hasattr(interface, "ip_address") else None,
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,

View File

@@ -7,7 +7,7 @@ from ipaddress import IPv4Address, IPv4Network
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import Field, validate_call
from pydantic import validate_call
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
@@ -1217,16 +1217,18 @@ class Router(NetworkNode, identifier="router"):
config: "Router.ConfigSchema"
class ConfigSchema(NetworkNode.ConfigSchema):
hostname: str = "router"
num_ports: int
"""Configuration Schema for Routers."""
hostname: str = "router"
num_ports: int = 5
def __init__(self, **kwargs):
if not kwargs.get("sys_log"):
kwargs["sys_log"] = SysLog(kwargs["config"].hostname)
if not kwargs.get("acl"):
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=kwargs["config"].hostname)
kwargs["acl"] = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=kwargs["config"].hostname
)
if not kwargs.get("route_table"):
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
super().__init__(**kwargs)
@@ -1656,5 +1658,7 @@ class Router(NetworkNode, identifier="router"):
next_hop_ip_address = config["default_route"].get("next_hop_ip_address", None)
if next_hop_ip_address:
router.route_table.set_default_route_next_hop_ip_address(next_hop_ip_address)
router.operating_state = NodeOperatingState.ON if not (p := config.get("operating_state")) else NodeOperatingState[p.upper()]
router.operating_state = (
NodeOperatingState.ON if not (p := config.get("operating_state")) else NodeOperatingState[p.upper()]
)
return router

View File

@@ -129,7 +129,7 @@ class WirelessRouter(Router, identifier="wireless_router"):
"""Configuration Schema for WirelessRouter nodes within PrimAITE."""
hostname: str = "WirelessRouter"
airspace: Optional[AirSpace] = None
airspace: AirSpace
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -262,9 +262,6 @@ class WirelessRouter(Router, identifier="wireless_router"):
:return: WirelessRouter instance.
:rtype: WirelessRouter
"""
operating_state = (
NodeOperatingState.ON if not (p := config.get("operating_state")) else NodeOperatingState[p.upper()]
)
router = cls(config=cls.ConfigSchema(**config))
if "router_interface" in config:
ip_address = config["router_interface"]["ip_address"]
@@ -297,4 +294,7 @@ class WirelessRouter(Router, identifier="wireless_router"):
next_hop_ip_address=IPv4Address(route.get("next_hop_ip_address")),
metric=float(route.get("metric", 0)),
)
router.operating_state = (
NodeOperatingState.ON if not (p := config.get("operating_state")) else NodeOperatingState[p.upper()]
)
return router

View File

@@ -208,7 +208,7 @@ class NMAP(Application, identifier="NMAP"):
if show:
table = PrettyTable(["IP Address", "Can Ping"])
table.align = "l"
table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan"
table.title = f"{self.software_manager.node.config.hostname} NMAP Ping Scan"
ip_addresses = self._explode_ip_address_network_array(target_ip_address)
@@ -367,7 +367,7 @@ class NMAP(Application, identifier="NMAP"):
if show:
table = PrettyTable(["IP Address", "Port", "Protocol"])
table.align = "l"
table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})"
table.title = f"{self.software_manager.node.config.hostname} NMAP Port Scan ({scan_type})"
self.sys_log.info(f"{self.name}: Starting port scan")
for ip_address in ip_addresses:
# Prevent port scan on this node

View File

@@ -12,6 +12,7 @@ from sb3_contrib import MaskablePPO
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.session.ray_envs import PrimaiteRayEnv, PrimaiteRayMARLEnv
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from tests import TEST_ASSETS_ROOT
CFG_PATH = TEST_ASSETS_ROOT / "configs/test_primaite_session.yaml"

View File

@@ -36,8 +36,8 @@ class SuperComputer(HostNode, identifier="supercomputer"):
SYSTEM_SOFTWARE: ClassVar[Dict] = {**HostNode.SYSTEM_SOFTWARE, "FTPClient": FTPClient}
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
def __init__(self, **kwargs):
print("--- Extended Component: SuperComputer ---")
super().__init__(ip_address=ip_address, subnet_mask=subnet_mask, **kwargs)
super().__init__(**kwargs)
pass

View File

@@ -16,7 +16,7 @@ def test_wireless_link_loading(wireless_wan_network):
# Configure Router 2 ACLs
router_2.acl.add_rule(action=ACLAction.PERMIT, position=1)
airspace = router_1.airspace
airspace = router_1.config.airspace
client.software_manager.install(FTPClient)
ftp_client: FTPClient = client.software_manager.software.get("FTPClient")

View File

@@ -83,12 +83,15 @@ def dmz_external_internal_network() -> Network:
)
# external node
external_node = Computer(
hostname="external_node",
ip_address="192.168.10.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.10.1",
start_up_duration=0,
external_node: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "external_node",
"ip_address": "192.168.10.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.10.1",
"start_up_duration": 0,
}
)
external_node.power_on()
external_node.software_manager.install(NTPServer)
@@ -98,12 +101,15 @@ def dmz_external_internal_network() -> Network:
network.connect(endpoint_b=external_node.network_interface[1], endpoint_a=firewall_node.external_port)
# internal node
internal_node = Computer(
hostname="internal_node",
ip_address="192.168.0.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.0.1",
start_up_duration=0,
internal_node: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "internal_node",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.0.1",
"start_up_duration": 0,
}
)
internal_node.power_on()
internal_node.software_manager.install(NTPClient)
@@ -114,12 +120,15 @@ def dmz_external_internal_network() -> Network:
network.connect(endpoint_b=internal_node.network_interface[1], endpoint_a=firewall_node.internal_port)
# dmz node
dmz_node = Computer(
hostname="dmz_node",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
dmz_node: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "dmz_node",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
)
dmz_node.power_on()
dmz_ntp_client: NTPClient = dmz_node.software_manager.software["NTPClient"]
@@ -157,9 +166,9 @@ def test_nodes_can_ping_default_gateway(dmz_external_internal_network):
internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node")
dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node")
assert internal_node.ping(internal_node.default_gateway) # default gateway internal
assert dmz_node.ping(dmz_node.default_gateway) # default gateway dmz
assert external_node.ping(external_node.default_gateway) # default gateway external
assert internal_node.ping(internal_node.config.default_gateway) # default gateway internal
assert dmz_node.ping(dmz_node.config.default_gateway) # default gateway dmz
assert external_node.ping(external_node.config.default_gateway) # default gateway external
def test_nodes_can_ping_default_gateway_on_another_subnet(dmz_external_internal_network):
@@ -173,14 +182,14 @@ def test_nodes_can_ping_default_gateway_on_another_subnet(dmz_external_internal_
internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node")
dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node")
assert internal_node.ping(external_node.default_gateway) # internal node to external default gateway
assert internal_node.ping(dmz_node.default_gateway) # internal node to dmz default gateway
assert internal_node.ping(external_node.config.default_gateway) # internal node to external default gateway
assert internal_node.ping(dmz_node.config.default_gateway) # internal node to dmz default gateway
assert dmz_node.ping(internal_node.default_gateway) # dmz node to internal default gateway
assert dmz_node.ping(external_node.default_gateway) # dmz node to external default gateway
assert dmz_node.ping(internal_node.config.default_gateway) # dmz node to internal default gateway
assert dmz_node.ping(external_node.config.default_gateway) # dmz node to external default gateway
assert external_node.ping(external_node.default_gateway) # external node to internal default gateway
assert external_node.ping(dmz_node.default_gateway) # external node to dmz default gateway
assert external_node.ping(external_node.config.default_gateway) # external node to internal default gateway
assert external_node.ping(dmz_node.config.default_gateway) # external node to dmz default gateway
def test_nodes_can_ping_each_other(dmz_external_internal_network):

View File

@@ -10,25 +10,31 @@ def test_node_to_node_ping():
"""Tests two Computers are able to ping each other."""
network = Network()
client_1 = Computer(
hostname="client_1",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
client_1: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "client_1",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
)
client_1.power_on()
server_1 = Server(
hostname="server_1",
ip_address="192.168.1.11",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
server_1: Server = Server.from_config(
config={
"type": "server",
"hostname": "server_1",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
)
server_1.power_on()
switch_1 = Switch(hostname="switch_1", start_up_duration=0)
switch_1: Switch = Switch.from_config(config={"type": "switch", "hostname": "switch_1", "start_up_duration": 0})
switch_1.power_on()
network.connect(endpoint_a=client_1.network_interface[1], endpoint_b=switch_1.network_interface[1])
@@ -41,14 +47,38 @@ def test_multi_nic():
"""Tests that Computers with multiple NICs can ping each other and the data go across the correct links."""
network = Network()
node_a = Computer(config=dict(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0))
node_a: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.10",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node_a.power_on()
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
node_b: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_b",
"ip_address": "192.168.0.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node_b.power_on()
node_b.connect_nic(NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0"))
node_c = Computer(hostname="node_c", ip_address="10.0.0.13", subnet_mask="255.0.0.0", start_up_duration=0)
node_c: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_c",
"ip_address": "10.0.0.13",
"subnet_mask": "255.0.0.0",
"start_up_duration": 0,
}
)
node_c.power_on()
network.connect(node_a.network_interface[1], node_b.network_interface[1])

View File

@@ -1,6 +1,7 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.networks import multi_lan_internet_network_example
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.web_browser import WebBrowser

View File

@@ -27,7 +27,15 @@ def test_network(example_network):
def test_adding_removing_nodes():
"""Check that we can create and add a node to a network."""
net = Network()
n1 = Computer(hostname="computer", ip_address="192.168.1.2", subnet_mask="255.255.255.0", start_up_duration=0)
n1 = Computer.from_config(
config={
"type": "computer",
"hostname": "computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
net.add_node(n1)
assert n1.parent is net
assert n1 in net
@@ -37,10 +45,18 @@ def test_adding_removing_nodes():
assert n1 not in net
def test_readding_node():
"""Check that warning is raised when readding a node."""
def test_reading_node():
"""Check that warning is raised when reading a node."""
net = Network()
n1 = Computer(hostname="computer", ip_address="192.168.1.2", subnet_mask="255.255.255.0", start_up_duration=0)
n1 = Computer.from_config(
config={
"type": "computer",
"hostname": "computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
net.add_node(n1)
net.add_node(n1)
assert n1.parent is net
@@ -50,7 +66,15 @@ def test_readding_node():
def test_removing_nonexistent_node():
"""Check that warning is raised when trying to remove a node that is not in the network."""
net = Network()
n1 = Computer(hostname="computer1", ip_address="192.168.1.1", subnet_mask="255.255.255.0", start_up_duration=0)
n1 = Computer.from_config(
config={
"type": "computer",
"hostname": "computer1",
"ip_address": "192.168.1.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
net.remove_node(n1)
assert n1.parent is None
assert n1 not in net
@@ -59,8 +83,24 @@ def test_removing_nonexistent_node():
def test_connecting_nodes():
"""Check that two nodes on the network can be connected."""
net = Network()
n1 = Computer(hostname="computer1", ip_address="192.168.1.1", subnet_mask="255.255.255.0", start_up_duration=0)
n2 = Computer(hostname="computer2", ip_address="192.168.1.2", subnet_mask="255.255.255.0", start_up_duration=0)
n1: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "computer1",
"ip_address": "192.168.1.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
n2: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "computer2",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
net.add_node(n1)
net.add_node(n2)
@@ -75,7 +115,15 @@ def test_connecting_nodes():
def test_connecting_node_to_itself_fails():
net = Network()
node = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
node = Computer.from_config(
config={
"type": "computer",
"hostname": "node_b",
"ip_address": "192.168.0.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node.power_on()
node.connect_nic(NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0"))
@@ -92,8 +140,24 @@ def test_connecting_node_to_itself_fails():
def test_disconnecting_nodes():
net = Network()
n1 = Computer(hostname="computer1", ip_address="192.168.1.1", subnet_mask="255.255.255.0", start_up_duration=0)
n2 = Computer(hostname="computer2", ip_address="192.168.1.2", subnet_mask="255.255.255.0", start_up_duration=0)
n1 = Computer.from_config(
config={
"type": "computer",
"hostname": "computer1",
"ip_address": "192.168.1.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
n2 = Computer.from_config(
config={
"type": "computer",
"hostname": "computer2",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
net.connect(n1.network_interface[1], n2.network_interface[1])
assert len(net.links) == 1

View File

@@ -15,25 +15,31 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def pc_a_pc_b_router_1() -> Tuple[Computer, Computer, Router]:
network = Network()
pc_a = Computer(
hostname="pc_a",
ip_address="192.168.0.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.0.1",
start_up_duration=0,
pc_a = Computer.from_config(
config={
"type": "computer",
"hostname": "pc_a",
"ip_address": "192.168.0.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.0.1",
"start_up_duration": 0,
}
)
pc_a.power_on()
pc_b = Computer(
hostname="pc_b",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
pc_b = Computer.from_config(
config={
"type": "computer",
"hostname": "pc_b",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
)
pc_b.power_on()
router_1 = Router(hostname="router_1", start_up_duration=0)
router_1 = Router.from_config(config={"type": "router", "hostname": "router_1", "start_up_duration": 0})
router_1.power_on()
router_1.configure_port(1, "192.168.0.1", "255.255.255.0")
@@ -52,18 +58,21 @@ def multi_hop_network() -> Network:
network = Network()
# Configure PC A
pc_a = Computer(
hostname="pc_a",
ip_address="192.168.0.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.0.1",
start_up_duration=0,
pc_a: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "pc_a",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.0.1",
"start_up_duration": 0,
}
)
pc_a.power_on()
network.add_node(pc_a)
# Configure Router 1
router_1 = Router(hostname="router_1", start_up_duration=0)
router_1: Router = Router.from_config(config={"type": "router", "hostname": "router_1", "start_up_duration": 0})
router_1.power_on()
network.add_node(router_1)
@@ -79,18 +88,21 @@ def multi_hop_network() -> Network:
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=PROTOCOL_LOOKUP["ICMP"], position=23)
# Configure PC B
pc_b = Computer(
hostname="pc_b",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1",
start_up_duration=0,
pc_b: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "pc_b",
"ip_address": "192.168.2.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.2.1",
"start_up_duration": 0,
}
)
pc_b.power_on()
network.add_node(pc_b)
# Configure Router 2
router_2 = Router(hostname="router_2", start_up_duration=0)
router_2: Router = Router.from_config(config={"type": "router", "hostname": "router_2", "start_up_duration": 0})
router_2.power_on()
network.add_node(router_2)
@@ -113,13 +125,13 @@ def multi_hop_network() -> Network:
def test_ping_default_gateway(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping(pc_a.default_gateway)
assert pc_a.ping(pc_a.config.default_gateway)
def test_ping_other_router_port(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping(pc_b.default_gateway)
assert pc_a.ping(pc_b.config.default_gateway)
def test_host_on_other_subnet(pc_a_pc_b_router_1):

View File

@@ -17,18 +17,23 @@ def wireless_wan_network():
network = Network()
# Configure PC A
pc_a = Computer(
hostname="pc_a",
ip_address="192.168.0.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.0.1",
start_up_duration=0,
pc_a = Computer.from_config(
config={
"type": "computer",
"hostname": "pc_a",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.0.1",
"start_up_duration": 0,
}
)
pc_a.power_on()
network.add_node(pc_a)
# Configure Router 1
router_1 = WirelessRouter(hostname="router_1", start_up_duration=0, airspace=network.airspace)
router_1 = WirelessRouter.from_config(
config={"type": "wireless_router", "hostname": "router_1", "start_up_duration": 0, "airspace": network.airspace}
)
router_1.power_on()
network.add_node(router_1)
@@ -43,18 +48,23 @@ def wireless_wan_network():
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=PROTOCOL_LOOKUP["ICMP"], position=23)
# Configure PC B
pc_b = Computer(
hostname="pc_b",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1",
start_up_duration=0,
pc_b: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "pc_b",
"ip_address": "192.168.2.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.2.1",
"start_up_duration": 0,
}
)
pc_b.power_on()
network.add_node(pc_b)
# Configure Router 2
router_2 = WirelessRouter(hostname="router_2", start_up_duration=0, airspace=network.airspace)
router_2: WirelessRouter = WirelessRouter.from_config(
config={"type": "wireless_router", "hostname": "router_2", "start_up_duration": 0, "airspace": network.airspace}
)
router_2.power_on()
network.add_node(router_2)
@@ -98,8 +108,8 @@ def wireless_wan_network_from_config_yaml():
def test_cross_wireless_wan_connectivity(wireless_wan_network):
pc_a, pc_b, router_1, router_2 = wireless_wan_network
# Ensure that PCs can ping across routers before any frequency change
assert pc_a.ping(pc_a.default_gateway), "PC A should ping its default gateway successfully."
assert pc_b.ping(pc_b.default_gateway), "PC B should ping its default gateway successfully."
assert pc_a.ping(pc_a.config.default_gateway), "PC A should ping its default gateway successfully."
assert pc_b.ping(pc_b.config.default_gateway), "PC B should ping its default gateway successfully."
assert pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers successfully."
assert pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers successfully."
@@ -109,8 +119,8 @@ def test_cross_wireless_wan_connectivity_from_yaml(wireless_wan_network_from_con
pc_a = wireless_wan_network_from_config_yaml.get_node_by_hostname("pc_a")
pc_b = wireless_wan_network_from_config_yaml.get_node_by_hostname("pc_b")
assert pc_a.ping(pc_a.default_gateway), "PC A should ping its default gateway successfully."
assert pc_b.ping(pc_b.default_gateway), "PC B should ping its default gateway successfully."
assert pc_a.ping(pc_a.config.default_gateway), "PC A should ping its default gateway successfully."
assert pc_b.ping(pc_b.config.default_gateway), "PC B should ping its default gateway successfully."
assert pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should ping PC B across routers successfully."
assert pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should ping PC A across routers successfully."

View File

@@ -10,13 +10,16 @@ from primaite.simulator.system.applications.application import Application, Appl
@pytest.fixture(scope="function")
def populated_node(application_class) -> Tuple[Application, Computer]:
computer: Computer = Computer(
hostname="test_computer",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
shut_down_duration=0,
computer: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "test_computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
"shut_down_duration": 0,
}
)
computer.power_on()
computer.software_manager.install(application_class)
@@ -29,13 +32,16 @@ def populated_node(application_class) -> Tuple[Application, Computer]:
def test_application_on_offline_node(application_class):
"""Test to check that the application cannot be interacted with when node it is on is off."""
computer: Computer = Computer(
hostname="test_computer",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
shut_down_duration=0,
computer: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "test_computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
"shut_down_duration": 0,
}
)
computer.software_manager.install(application_class)

View File

@@ -20,11 +20,27 @@ from primaite.simulator.system.software import SoftwareHealthState
@pytest.fixture(scope="function")
def peer_to_peer() -> Tuple[Computer, Computer]:
network = Network()
node_a: Computer = Computer.from_config(config={"type":"computer", "hostname":"node_a", "ip_address":"192.168.0.10", "subnet_mask":"255.255.255.0", "start_up_duration":0})
node_a: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.10",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node_a.power_on()
node_a.software_manager.get_open_ports()
node_b: Computer = Computer.from_config(config={"type":"computer", "hostname":"node_b", "ip_address":"192.168.0.11", "subnet_mask":"255.255.255.0", "start_up_duration":0})
node_b: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_b",
"ip_address": "192.168.0.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node_b.power_on()
network.connect(node_a.network_interface[1], node_b.network_interface[1])
@@ -412,8 +428,14 @@ def test_database_service_can_terminate_connection(peer_to_peer):
def test_client_connection_terminate_does_not_terminate_another_clients_connection():
network = Network()
db_server: Server = Server.from_config(config={"type":"server",
"hostname":"db_client", "ip_address":"192.168.0.11", "subnet_mask":"255.255.255.0", "start_up_duration":0}
db_server: Server = Server.from_config(
config={
"type": "server",
"hostname": "db_client",
"ip_address": "192.168.0.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
db_server.power_on()
@@ -421,8 +443,14 @@ def test_client_connection_terminate_does_not_terminate_another_clients_connecti
db_service: DatabaseService = db_server.software_manager.software["DatabaseService"] # noqa
db_service.start()
client_a = Computer(
hostname="client_a", ip_address="192.168.0.12", subnet_mask="255.255.255.0", start_up_duration=0
client_a = Computer.from_config(
config={
"type": "computer",
"hostname": "client_a",
"ip_address": "192.168.0.12",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
client_a.power_on()
@@ -430,8 +458,14 @@ def test_client_connection_terminate_does_not_terminate_another_clients_connecti
client_a.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.11"))
client_a.software_manager.software["DatabaseClient"].run()
client_b = Computer(
hostname="client_b", ip_address="192.168.0.13", subnet_mask="255.255.255.0", start_up_duration=0
client_b = Computer.from_config(
config={
"type": "computer",
"hostname": "client_b",
"ip_address": "192.168.0.13",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
client_b.power_on()
@@ -439,7 +473,7 @@ def test_client_connection_terminate_does_not_terminate_another_clients_connecti
client_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.11"))
client_b.software_manager.software["DatabaseClient"].run()
switch = Switch(hostname="switch", start_up_duration=0, num_ports=3)
switch = Switch.from_config(config={"type": "switch", "hostname": "switch", "start_up_duration": 0, "num_ports": 3})
switch.power_on()
network.connect(endpoint_a=switch.network_interface[1], endpoint_b=db_server.network_interface[1])
@@ -465,6 +499,14 @@ def test_client_connection_terminate_does_not_terminate_another_clients_connecti
def test_database_server_install_ftp_client():
server: Server = Server.from_config(config={"type":"server", "hostname":"db_server", "ip_address":"192.168.1.2", "subnet_mask":"255.255.255.0", "start_up_duration":0})
server: Server = Server.from_config(
config={
"type": "server",
"hostname": "db_server",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
server.software_manager.install(DatabaseService)
assert server.software_manager.software.get("FTPClient")

View File

@@ -14,21 +14,27 @@ from primaite.simulator.network.hardware.nodes.host.server import Server
def client_server_network() -> Tuple[Computer, Server, Network]:
network = Network()
client = Computer(
hostname="client",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
client = Computer.from_config(
config={
"type": "computer",
"hostname": "client",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
)
client.power_on()
server = Server(
hostname="server",
ip_address="192.168.1.3",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
server = Server.from_config(
config={
"type": "server",
"hostname": "server",
"ip_address": "192.168.1.3",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
)
server.power_on()

View File

@@ -111,7 +111,7 @@ def test_request_fails_if_node_off(example_network, node_request):
"""Test that requests succeed when the node is on, and fail if the node is off."""
net = example_network
client_1: HostNode = net.get_node_by_hostname("client_1")
client_1.shut_down_duration = 0
client_1.config.shut_down_duration = 0
assert client_1.operating_state == NodeOperatingState.ON
resp_1 = net.apply_request(node_request)

View File

@@ -17,11 +17,13 @@ def node() -> Node:
"hostname": "test",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"operating_state": "OFF",
}
computer = Computer.from_config(config=computer_cfg)
return computer
def test_node_startup(node):
assert node.operating_state == NodeOperatingState.OFF
node.apply_request(["startup"])

View File

@@ -36,6 +36,16 @@ def test_create_dns_client(dns_client):
def test_dns_client_add_domain_to_cache_when_not_running(dns_client):
dns_client_service: DNSClient = dns_client.software_manager.software.get("DNSClient")
# shutdown the dns_client
dns_client.power_off()
# wait for dns_client to turn off
idx = 0
while dns_client.operating_state == NodeOperatingState.SHUTTING_DOWN:
dns_client.apply_timestep(idx)
idx += 1
assert dns_client.operating_state is NodeOperatingState.OFF
assert dns_client_service.operating_state is ServiceOperatingState.STOPPED

View File

@@ -66,7 +66,7 @@ def test_dns_server_receive(dns_server):
}
client = Computer.from_config(config=client_cfg)
client.power_on()
client.dns_server = IPv4Address("192.168.1.10")
client.config.dns_server = IPv4Address("192.168.1.10")
network = Network()
network.connect(dns_server.network_interface[1], client.network_interface[1])
dns_client: DNSClient = client.software_manager.software["DNSClient"] # noqa

View File

@@ -12,6 +12,7 @@ from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.networks import arcd_uc2_network
from primaite.simulator.network.protocols.ssh import (
SSHConnectionMessage,
SSHPacket,
@@ -29,8 +30,14 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def terminal_on_computer() -> Tuple[Terminal, Computer]:
computer: Computer = Computer.from_config(config={"type":"computer",
"hostname":"node_a", "ip_address":"192.168.0.10", "subnet_mask":"255.255.255.0", "start_up_duration":0}
computer: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.10",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
computer.power_on()
terminal: Terminal = computer.software_manager.software.get("Terminal")
@@ -41,19 +48,27 @@ def terminal_on_computer() -> Tuple[Terminal, Computer]:
@pytest.fixture(scope="function")
def basic_network() -> Network:
network = Network()
node_a = Computer.from_config(config={"type":"computer",
"hostname":"node_a",
"ip_address":"192.168.0.10",
"subnet_mask":"255.255.255.0",
"start_up_duration":0})
node_a = Computer.from_config(
config={
"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.10",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node_a.power_on()
node_a.software_manager.get_open_ports()
node_b = Computer.from_config(config={"type":"computer",
"hostname":"node_b",
"ip_address":"192.168.0.11",
"subnet_mask":"255.255.255.0",
"start_up_duration":0})
node_b = Computer.from_config(
config={
"type": "computer",
"hostname": "node_b",
"ip_address": "192.168.0.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
node_b.power_on()
network.connect(node_a.network_interface[1], node_b.network_interface[1])
@@ -65,20 +80,23 @@ def wireless_wan_network():
network = Network()
# Configure PC A
pc_a_cfg = {"type": "computer",
"hostname":"pc_a",
"ip_address":"192.168.0.2",
"subnet_mask":"255.255.255.0",
"default_gateway":"192.168.0.1",
"start_up_duration":0,
}
pc_a_cfg = {
"type": "computer",
"hostname": "pc_a",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.0.1",
"start_up_duration": 0,
}
pc_a = Computer.from_config(config=pc_a_cfg)
pc_a.power_on()
network.add_node(pc_a)
# Configure Router 1
router_1 = WirelessRouter.from_config(config={"type":"wireless_router", "hostname":"router_1", "start_up_duration":0, "airspace":network.airspace})
router_1 = WirelessRouter.from_config(
config={"type": "wireless_router", "hostname": "router_1", "start_up_duration": 0, "airspace": network.airspace}
)
router_1.power_on()
network.add_node(router_1)
@@ -99,43 +117,29 @@ def wireless_wan_network():
# Configure PC B
pc_b_cfg = {"type": "computer",
"hostname":"pc_b",
"ip_address":"192.168.2.2",
"subnet_mask":"255.255.255.0",
"default_gateway":"192.168.2.1",
"start_up_duration":0,
}
pc_b_cfg = {
"type": "computer",
"hostname": "pc_b",
"ip_address": "192.168.2.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.2.1",
"start_up_duration": 0,
}
pc_b = Computer.from_config(config=pc_b_cfg)
pc_b.power_on()
network.add_node(pc_b)
# Configure Router 2
router_2 = WirelessRouter.from_config(config={"type":"wireless_router", "hostname":"router_2", "start_up_duration":0, "airspace":network.airspace})
router_2.power_on()
network.add_node(router_2)
# Configure the connection between PC B and Router 2 port 2
router_2.configure_router_interface("192.168.2.1", "255.255.255.0")
network.connect(pc_b.network_interface[1], router_2.network_interface[2])
# Configure Router 2 ACLs
# Configure the wireless connection between Router 1 port 1 and Router 2 port 1
router_1.configure_wireless_access_point("192.168.1.1", "255.255.255.0")
router_2.configure_wireless_access_point("192.168.1.2", "255.255.255.0")
router_1.route_table.add_route(
address="192.168.2.0", subnet_mask="255.255.255.0", next_hop_ip_address="192.168.1.2"
)
# Configure Route from Router 2 to PC A subnet
router_2.route_table.add_route(
address="192.168.0.2", subnet_mask="255.255.255.0", next_hop_ip_address="192.168.1.1"
)
return pc_a, pc_b, router_1, router_2
return network
@pytest.fixture
@@ -156,11 +160,15 @@ def test_terminal_creation(terminal_on_computer):
def test_terminal_install_default():
"""Terminal should be auto installed onto Nodes"""
computer: Computer = Computer.from_config(config={"type":"computer",
"hostname":"node_a",
"ip_address":"192.168.0.10",
"subnet_mask":"255.255.255.0",
"start_up_duration":0})
computer: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.10",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
computer.power_on()
assert computer.software_manager.software.get("Terminal")
@@ -168,7 +176,7 @@ def test_terminal_install_default():
def test_terminal_not_on_switch():
"""Ensure terminal does not auto-install to switch"""
test_switch = Switch.from_config(config={"type":"switch", "hostname":"Test"})
test_switch = Switch.from_config(config={"type": "switch", "hostname": "Test"})
assert not test_switch.software_manager.software.get("Terminal")
@@ -291,7 +299,10 @@ def test_terminal_ignores_when_off(basic_network):
def test_computer_remote_login_to_router(wireless_wan_network):
"""Test to confirm that a computer can SSH into a router."""
pc_a, _, router_1, _ = wireless_wan_network
pc_a = wireless_wan_network.get_node_by_hostname("pc_a")
router_1 = wireless_wan_network.get_node_by_hostname("router_1")
pc_a_terminal: Terminal = pc_a.software_manager.software.get("Terminal")
@@ -310,7 +321,9 @@ def test_computer_remote_login_to_router(wireless_wan_network):
def test_router_remote_login_to_computer(wireless_wan_network):
"""Test to confirm that a router can ssh into a computer."""
pc_a, _, router_1, _ = wireless_wan_network
pc_a = wireless_wan_network.get_node_by_hostname("pc_a")
router_1 = wireless_wan_network.get_node_by_hostname("router_1")
router_1_terminal: Terminal = router_1.software_manager.software.get("Terminal")
@@ -329,7 +342,9 @@ def test_router_remote_login_to_computer(wireless_wan_network):
def test_router_blocks_SSH_traffic(wireless_wan_network):
"""Test to check that router will block SSH traffic if no ACL rule."""
pc_a, _, router_1, _ = wireless_wan_network
pc_a = wireless_wan_network.get_node_by_hostname("pc_a")
router_1 = wireless_wan_network.get_node_by_hostname("router_1")
# Remove rule that allows SSH traffic.
router_1.acl.remove_rule(position=21)
@@ -343,20 +358,22 @@ def test_router_blocks_SSH_traffic(wireless_wan_network):
assert len(pc_a_terminal._connections) == 0
def test_SSH_across_network(wireless_wan_network):
def test_SSH_across_network():
"""Test to show ability to SSH across a network."""
pc_a, pc_b, router_1, router_2 = wireless_wan_network
network: Network = arcd_uc2_network()
pc_a = network.get_node_by_hostname("client_1")
router_1 = network.get_node_by_hostname("router_1")
terminal_a: Terminal = pc_a.software_manager.software.get("Terminal")
terminal_b: Terminal = pc_b.software_manager.software.get("Terminal")
router_2.acl.add_rule(
router_1.acl.add_rule(
action=ACLAction.PERMIT, src_port=PORT_LOOKUP["SSH"], dst_port=PORT_LOOKUP["SSH"], position=21
)
assert len(terminal_a._connections) == 0
terminal_b_on_terminal_a = terminal_b.login(username="admin", password="admin", ip_address="192.168.0.2")
# Login to the Domain Controller
terminal_a.login(username="admin", password="admin", ip_address="192.168.1.10")
assert len(terminal_a._connections) == 1