#2887 - Node class changes to address some test failures. Addressed some inconsistencies around operating_state, amended instantiation of some Nodes in test environments

This commit is contained in:
Charlie Crane
2025-01-27 16:35:40 +00:00
parent a7395c466e
commit 0570ab984d
46 changed files with 548 additions and 391 deletions

View File

@@ -46,7 +46,7 @@ class Router(NetworkNode, identifier="router"):
num_ports: int = 5
hostname: ClassVar[str] = "Router"
hostname: str = "Router"
ports: list = []
@@ -55,4 +55,4 @@ class Router(NetworkNode, identifier="router"):
Changes to YAML file.
=====================
Nodes defined within configuration YAML files for use with PrimAITE 3.X should still be compatible following these changes.
Nodes defined within configuration YAML files for use with PrimAITE 3.X should still be compatible following these changes.

View File

@@ -320,7 +320,7 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
self.enabled = True
self._connected_node.sys_log.info(f"Network Interface {self} enabled")
self.pcap = PacketCapture(
hostname=self._connected_node.hostname, port_num=self.port_num, port_name=self.port_name
hostname=self._connected_node.config.hostname, port_num=self.port_num, port_name=self.port_name
)
self.airspace.add_wireless_interface(self)

View File

@@ -196,7 +196,13 @@ class Network(SimComponent):
if port.ip_address != IPv4Address("127.0.0.1"):
port_str = port.port_name if port.port_name else port.port_num
table.add_row(
[node.config.hostname, port_str, port.ip_address, port.subnet_mask, node.default_gateway]
[
node.config.hostname,
port_str,
port.ip_address,
port.subnet_mask,
node.default_gateway,
]
)
print(table)
@@ -288,7 +294,9 @@ class Network(SimComponent):
node.parent = self
self._nx_graph.add_node(node.config.hostname)
_LOGGER.debug(f"Added node {node.uuid} to Network {self.uuid}")
self._node_request_manager.add_request(name=node.config.hostname, request_type=RequestType(func=node._request_manager))
self._node_request_manager.add_request(
name=node.config.hostname, request_type=RequestType(func=node._request_manager)
)
def get_node_by_hostname(self, hostname: str) -> Optional[Node]:
"""

View File

@@ -153,7 +153,9 @@ class OfficeLANAdder(NetworkNodeAdder, identifier="office_lan"):
# Create a core switch if more than one edge switch is needed
if num_of_switches > 1:
core_switch = Switch.from_config(config = {"type":"switch","hostname":f"switch_core_{config.lan_name}", "start_up_duration": 0 })
core_switch = Switch.from_config(
config={"type": "switch", "hostname": f"switch_core_{config.lan_name}", "start_up_duration": 0}
)
core_switch.power_on()
network.add_node(core_switch)
core_switch_port = 1
@@ -165,7 +167,9 @@ class OfficeLANAdder(NetworkNodeAdder, identifier="office_lan"):
if config.include_router:
default_gateway = IPv4Address(f"192.168.{config.subnet_base}.1")
# router = Router(hostname=f"router_{config.lan_name}", start_up_duration=0)
router = Router.from_config(config={"hostname":f"router_{config.lan_name}", "type": "router", "start_up_duration": 0})
router = Router.from_config(
config={"hostname": f"router_{config.lan_name}", "type": "router", "start_up_duration": 0}
)
router.power_on()
router.acl.add_rule(
action=ACLAction.PERMIT, src_port=PORT_LOOKUP["ARP"], dst_port=PORT_LOOKUP["ARP"], position=22
@@ -178,7 +182,9 @@ class OfficeLANAdder(NetworkNodeAdder, identifier="office_lan"):
# Initialise the first edge switch and connect to the router or core switch
switch_port = 0
switch_n = 1
switch = Switch.from_config(config={"type": "switch","hostname":f"switch_edge_{switch_n}_{config.lan_name}", "start_up_duration":0})
switch = Switch.from_config(
config={"type": "switch", "hostname": f"switch_edge_{switch_n}_{config.lan_name}", "start_up_duration": 0}
)
switch.power_on()
network.add_node(switch)
if num_of_switches > 1:
@@ -196,7 +202,13 @@ class OfficeLANAdder(NetworkNodeAdder, identifier="office_lan"):
if switch_port == effective_network_interface:
switch_n += 1
switch_port = 0
switch = Switch.from_config(config={"type": "switch","hostname":f"switch_edge_{switch_n}_{config.lan_name}", "start_up_duration":0})
switch = Switch.from_config(
config={
"type": "switch",
"hostname": f"switch_edge_{switch_n}_{config.lan_name}",
"start_up_duration": 0,
}
)
switch.power_on()
network.add_node(switch)
# Connect the new switch to the router or core switch
@@ -213,13 +225,14 @@ class OfficeLANAdder(NetworkNodeAdder, identifier="office_lan"):
)
# Create and add a PC to the network
pc_cfg = {"type": "computer",
"hostname": f"pc_{i}_{config.lan_name}",
"ip_address": f"192.168.{config.subnet_base}.{i+config.pcs_ip_block_start-1}",
"default_gateway": "192.168.10.1",
"start_up_duration": 0,
}
pc = Computer.from_config(config = pc_cfg)
pc_cfg = {
"type": "computer",
"hostname": f"pc_{i}_{config.lan_name}",
"ip_address": f"192.168.{config.subnet_base}.{i+config.pcs_ip_block_start-1}",
"default_gateway": "192.168.10.1",
"start_up_duration": 0,
}
pc = Computer.from_config(config=pc_cfg)
pc.power_on()
network.add_node(pc)

View File

@@ -1529,22 +1529,21 @@ class Node(SimComponent, ABC):
_identifier: ClassVar[str] = "unknown"
"""Identifier for this particular class, used for printing and logging. Each subclass redefines this."""
config: Node.ConfigSchema
config: Node.ConfigSchema = Field(default_factory=lambda: Node.ConfigSchema())
"""Configuration items within Node"""
class ConfigSchema(BaseModel, ABC):
"""Configuration Schema for Node based classes."""
model_config = ConfigDict(arbitrary_types_allowed=True)
"""Configure pydantic to allow arbitrary types and to let the instance have attributes not present in the model."""
"""Configure pydantic to allow arbitrary types, let the instance have attributes not present in the model."""
hostname: str = "default"
"The node hostname on the network."
revealed_to_red: bool = False
"Informs whether the node has been revealed to a red agent."
start_up_duration: int = 0
start_up_duration: int = 3
"Time steps needed for the node to start up."
start_up_countdown: int = 0
@@ -1617,12 +1616,10 @@ class Node(SimComponent, ABC):
file_system=kwargs.get("file_system"),
dns_server=kwargs.get("dns_server"),
)
super().__init__(**kwargs)
self._install_system_software()
self.session_manager.node = self
self.session_manager.software_manager = self.software_manager
self.power_on()
@property
def user_manager(self) -> Optional[UserManager]:
@@ -1713,7 +1710,7 @@ class Node(SimComponent, ABC):
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return f"Cannot perform request on node '{self.node.hostname}' because it is not powered on."
return f"Cannot perform request on node '{self.node.config.hostname}' because it is not powered on."
class _NodeIsOffValidator(RequestPermissionValidator):
"""
@@ -1732,7 +1729,7 @@ class Node(SimComponent, ABC):
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return f"Cannot perform request on node '{self.node.hostname}' because it is not turned off."
return f"Cannot perform request on node '{self.node.config.hostname}' because it is not turned off."
def _init_request_manager(self) -> RequestManager:
"""
@@ -1900,7 +1897,7 @@ class Node(SimComponent, ABC):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Open Ports"
table.title = f"{self.config.hostname} Open Ports"
for port in self.software_manager.get_open_ports():
if port > 0:
table.add_row([port])
@@ -1927,7 +1924,7 @@ class Node(SimComponent, ABC):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Network Interface Cards"
table.title = f"{self.config.hostname} Network Interface Cards"
for port, network_interface in self.network_interface.items():
ip_address = ""
if hasattr(network_interface, "ip_address"):
@@ -1967,7 +1964,7 @@ class Node(SimComponent, ABC):
else:
if self.operating_state == NodeOperatingState.BOOTING:
self.operating_state = NodeOperatingState.ON
self.sys_log.info(f"{self.hostname}: Turned on")
self.sys_log.info(f"{self.config.hostname}: Turned on")
for network_interface in self.network_interfaces.values():
network_interface.enable()

View File

@@ -37,7 +37,7 @@ class Computer(HostNode, identifier="computer"):
SYSTEM_SOFTWARE: ClassVar[Dict] = {**HostNode.SYSTEM_SOFTWARE, "FTPClient": FTPClient}
config: "Computer.ConfigSchema" = Field(default_factory=lambda: Computer.ConfigSchema())
config: "Computer.ConfigSchema" = Field(default_factory=lambda: Computer.ConfigSchema())
class ConfigSchema(HostNode.ConfigSchema):
"""Configuration Schema for Computer class."""

View File

@@ -1,6 +1,7 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from typing import ClassVar
from pydantic import Field
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
@@ -45,10 +46,9 @@ class Printer(HostNode, identifier="printer"):
# TODO: Implement printer-specific behaviour
config: "Printer.ConfigSchema" = Field(default_factory=lambda: Printer.ConfigSchema())
class ConfigSchema(HostNode.ConfigSchema):
"""Configuration Schema for Printer class."""
hostname: str = "printer"
hostname: str = "printer"

View File

@@ -108,7 +108,6 @@ class Firewall(Router, identifier="firewall"):
hostname: str = "firewall"
num_ports: int = 0
operating_state: NodeOperatingState = NodeOperatingState.ON
def __init__(self, **kwargs):
if not kwargs.get("sys_log"):
@@ -242,7 +241,7 @@ class Firewall(Router, identifier="firewall"):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Network Interfaces"
table.title = f"{self.config.hostname} Network Interfaces"
ports = {"External": self.external_port, "Internal": self.internal_port, "DMZ": self.dmz_port}
for port, network_interface in ports.items():
table.add_row(
@@ -572,7 +571,7 @@ class Firewall(Router, identifier="firewall"):
@classmethod
def from_config(cls, config: dict) -> "Firewall":
"""Create a firewall based on a config dict."""
firewall = Firewall(config = cls.ConfigSchema(**config))
firewall = Firewall(config=cls.ConfigSchema(**config))
if "ports" in config:
internal_port = config["ports"]["internal_port"]

View File

@@ -1218,7 +1218,7 @@ class Router(NetworkNode, identifier="router"):
route_table: RouteTable
config: "Router.ConfigSchema"
config: "Router.ConfigSchema" = Field(default_factory=lambda: Router.ConfigSchema())
class ConfigSchema(NetworkNode.ConfigSchema):
"""Configuration Schema for Router Objects."""
@@ -1230,12 +1230,13 @@ class Router(NetworkNode, identifier="router"):
ports: Dict[Union[int, str], Dict] = {}
def __init__(self, **kwargs):
if not kwargs.get("sys_log"):
kwargs["sys_log"] = SysLog(kwargs["config"].hostname)
if not kwargs.get("acl"):
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=kwargs["config"].hostname)
kwargs["acl"] = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=kwargs["config"].hostname
)
if not kwargs.get("route_table"):
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
super().__init__(**kwargs)
@@ -1632,8 +1633,7 @@ class Router(NetworkNode, identifier="router"):
:return: Configured router.
:rtype: Router
"""
router = Router(config=Router.ConfigSchema(**config)
)
router = Router(config=Router.ConfigSchema(**config))
if "ports" in config:
for port_num, port_cfg in config["ports"].items():
router.configure_port(
@@ -1666,4 +1666,4 @@ class Router(NetworkNode, identifier="router"):
next_hop_ip_address = config["default_route"].get("next_hop_ip_address", None)
if next_hop_ip_address:
router.route_table.set_default_route_next_hop_ip_address(next_hop_ip_address)
return router
return router

View File

@@ -1,7 +1,7 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from __future__ import annotations
from typing import ClassVar, Dict, Optional
from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from pydantic import Field
@@ -89,11 +89,7 @@ class SwitchPort(WiredNetworkInterface):
class Switch(NetworkNode, identifier="switch"):
"""
A class representing a Layer 2 network switch.
:ivar num_ports: The number of ports on the switch. Default is 24.
"""
"""A class representing a Layer 2 network switch."""
network_interfaces: Dict[str, SwitchPort] = {}
"The SwitchPorts on the Switch."
@@ -102,7 +98,7 @@ class Switch(NetworkNode, identifier="switch"):
mac_address_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
config: "Switch.ConfigSchema"
config: "Switch.ConfigSchema" = Field(default_factory=lambda: Switch.ConfigSchema())
class ConfigSchema(NetworkNode.ConfigSchema):
"""Configuration Schema for Switch nodes within PrimAITE."""

View File

@@ -122,7 +122,6 @@ class WirelessRouter(Router, identifier="wireless_router"):
network_interfaces: Dict[str, Union[RouterInterface, WirelessAccessPoint]] = {}
network_interface: Dict[int, Union[RouterInterface, WirelessAccessPoint]] = {}
airspace: AirSpace
config: "WirelessRouter.ConfigSchema" = Field(default_factory=lambda: WirelessRouter.ConfigSchema())
@@ -130,12 +129,15 @@ class WirelessRouter(Router, identifier="wireless_router"):
"""Configuration Schema for WirelessRouter nodes within PrimAITE."""
hostname: str = "WirelessRouter"
airspace: Optional[AirSpace] = None
def __init__(self, **kwargs):
super().__init__(hostname=kwargs["config"].hostname, num_ports=0, airspace=kwargs["config"].airspace, **kwargs)
super().__init__(**kwargs)
self.connect_nic(
WirelessAccessPoint(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", airspace=kwargs["config"].airspace)
WirelessAccessPoint(
ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", airspace=kwargs["config"].airspace
)
)
self.connect_nic(RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0"))
@@ -233,7 +235,7 @@ class WirelessRouter(Router, identifier="wireless_router"):
)
@classmethod
def from_config(cls, cfg: Dict, **kwargs) -> "WirelessRouter":
def from_config(cls, config: Dict, **kwargs) -> "WirelessRouter":
"""Generate the wireless router from config.
Schema:
@@ -261,21 +263,21 @@ class WirelessRouter(Router, identifier="wireless_router"):
:rtype: WirelessRouter
"""
operating_state = (
NodeOperatingState.ON if not (p := cfg.get("operating_state")) else NodeOperatingState[p.upper()]
NodeOperatingState.ON if not (p := config.get("operating_state")) else NodeOperatingState[p.upper()]
)
router = cls(hostname=cfg["hostname"], operating_state=operating_state, airspace=kwargs["airspace"])
if "router_interface" in cfg:
ip_address = cfg["router_interface"]["ip_address"]
subnet_mask = cfg["router_interface"]["subnet_mask"]
router = cls(config=cls.ConfigSchema(**config))
if "router_interface" in config:
ip_address = config["router_interface"]["ip_address"]
subnet_mask = config["router_interface"]["subnet_mask"]
router.configure_router_interface(ip_address=ip_address, subnet_mask=subnet_mask)
if "wireless_access_point" in cfg:
ip_address = cfg["wireless_access_point"]["ip_address"]
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
frequency = AirSpaceFrequency._registry[cfg["wireless_access_point"]["frequency"]]
if "wireless_access_point" in config:
ip_address = config["wireless_access_point"]["ip_address"]
subnet_mask = config["wireless_access_point"]["subnet_mask"]
frequency = AirSpaceFrequency._registry[config["wireless_access_point"]["frequency"]]
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
if "acl" in cfg:
for r_num, r_cfg in cfg["acl"].items():
if "acl" in config:
for r_num, r_cfg in config["acl"].items():
router.acl.add_rule(
action=ACLAction[r_cfg["action"]],
src_port=None if not (p := r_cfg.get("src_port")) else PORT_LOOKUP[p],
@@ -287,8 +289,8 @@ class WirelessRouter(Router, identifier="wireless_router"):
dst_wildcard_mask=r_cfg.get("dst_wildcard_mask"),
position=r_num,
)
if "routes" in cfg:
for route in cfg.get("routes"):
if "routes" in config:
for route in config.get("routes"):
router.route_table.add_route(
address=IPv4Address(route.get("address")),
subnet_mask=IPv4Address(route.get("subnet_mask", "255.255.255.0")),

View File

@@ -128,33 +128,40 @@ def arcd_uc2_network() -> Network:
network = Network()
# Router 1
router_1 = Router.from_config(config={"type":"router", "hostname":"router_1", "num_ports":5, "start_up_duration":0})
router_1 = Router.from_config(
config={"type": "router", "hostname": "router_1", "num_ports": 5, "start_up_duration": 0}
)
router_1.power_on()
router_1.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
router_1.configure_port(port=2, ip_address="192.168.10.1", subnet_mask="255.255.255.0")
# Switch 1
switch_1 = Switch.from_config(config={"type":"switch", "hostname":"switch_1", "num_ports":8, "start_up_duration":0})
switch_1 = Switch.from_config(
config={"type": "switch", "hostname": "switch_1", "num_ports": 8, "start_up_duration": 0}
)
switch_1.power_on()
network.connect(endpoint_a=router_1.network_interface[1], endpoint_b=switch_1.network_interface[8])
router_1.enable_port(1)
# Switch 2
switch_2 = Switch.from_config(config={"type":"switch", "hostname":"switch_2", "num_ports":8, "start_up_duration":0})
switch_2 = Switch.from_config(
config={"type": "switch", "hostname": "switch_2", "num_ports": 8, "start_up_duration": 0}
)
switch_2.power_on()
network.connect(endpoint_a=router_1.network_interface[2], endpoint_b=switch_2.network_interface[8])
router_1.enable_port(2)
# Client 1
client_1_cfg = {"type": "computer",
"hostname": "client_1",
"ip_address": "192.168.10.21",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.10.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
client_1: Computer = Computer.from_config(config = client_1_cfg)
client_1_cfg = {
"type": "computer",
"hostname": "client_1",
"ip_address": "192.168.10.21",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.10.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
client_1: Computer = Computer.from_config(config=client_1_cfg)
client_1.power_on()
network.connect(endpoint_b=client_1.network_interface[1], endpoint_a=switch_2.network_interface[1])
@@ -175,15 +182,16 @@ def arcd_uc2_network() -> Network:
# Client 2
client_2_cfg = {"type": "computer",
"hostname": "client_2",
"ip_address": "192.168.10.22",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.10.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
client_2: Computer = Computer.from_config(config = client_2_cfg)
client_2_cfg = {
"type": "computer",
"hostname": "client_2",
"ip_address": "192.168.10.22",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.10.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
client_2: Computer = Computer.from_config(config=client_2_cfg)
client_2.power_on()
client_2.software_manager.install(DatabaseClient)
@@ -199,13 +207,14 @@ def arcd_uc2_network() -> Network:
# Domain Controller
domain_controller_cfg = {"type": "server",
"hostname": "domain_controller",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0
}
domain_controller_cfg = {
"type": "server",
"hostname": "domain_controller",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
domain_controller = Server.from_config(config=domain_controller_cfg)
domain_controller.power_on()
@@ -215,14 +224,15 @@ def arcd_uc2_network() -> Network:
# Database Server
database_server_cfg = {"type": "server",
"hostname": "database_server",
"ip_address": "192.168.1.14",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0
}
database_server_cfg = {
"type": "server",
"hostname": "database_server",
"ip_address": "192.168.1.14",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
database_server = Server.from_config(config=database_server_cfg)
@@ -236,15 +246,15 @@ def arcd_uc2_network() -> Network:
# Web Server
web_server_cfg = {"type": "server",
"hostname": "web_server",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0
}
web_server_cfg = {
"type": "server",
"hostname": "web_server",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
web_server = Server.from_config(config=web_server_cfg)
web_server.power_on()
@@ -263,14 +273,15 @@ def arcd_uc2_network() -> Network:
dns_server_service.dns_register("arcd.com", web_server.network_interface[1].ip_address)
# Backup Server
backup_server_cfg = {"type": "server",
"hostname": "backup_server",
"ip_address": "192.168.1.16",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0
}
backup_server_cfg = {
"type": "server",
"hostname": "backup_server",
"ip_address": "192.168.1.16",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
backup_server: Server = Server.from_config(config=backup_server_cfg)
backup_server.power_on()
@@ -278,14 +289,15 @@ def arcd_uc2_network() -> Network:
network.connect(endpoint_b=backup_server.network_interface[1], endpoint_a=switch_1.network_interface[4])
# Security Suite
security_suite_cfg = {"type": "server",
"hostname": "backup_server",
"ip_address": "192.168.1.110",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0
}
security_suite_cfg = {
"type": "server",
"hostname": "backup_server",
"ip_address": "192.168.1.110",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
"start_up_duration": 0,
}
security_suite: Server = Server.from_config(config=security_suite_cfg)
security_suite.power_on()
network.connect(endpoint_b=security_suite.network_interface[1], endpoint_a=switch_1.network_interface[7])

View File

@@ -119,13 +119,13 @@ def application_class():
@pytest.fixture(scope="function")
def file_system() -> FileSystem:
# computer = Computer(hostname="fs_node", ip_address="192.168.1.2", subnet_mask="255.255.255.0", start_up_duration=0)
computer_cfg = {"type": "computer",
"hostname": "fs_node",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
computer_cfg = {
"type": "computer",
"hostname": "fs_node",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
computer = Computer.from_config(config=computer_cfg)
computer.power_on()
return computer.file_system
@@ -136,23 +136,29 @@ def client_server() -> Tuple[Computer, Server]:
network = Network()
# Create Computer
computer = Computer(
hostname="computer",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
)
computer_cfg = {
"type": "computer",
"hostname": "computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
computer: Computer = Computer.from_config(config=computer_cfg)
computer.power_on()
# Create Server
server = Server(
hostname="server",
ip_address="192.168.1.3",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
)
server_cfg = {
"type": "server",
"hostname": "server",
"ip_address": "192.168.1.3",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
server: Server = Server.from_config(config=server_cfg)
server.power_on()
# Connect Computer and Server
@@ -169,26 +175,33 @@ def client_switch_server() -> Tuple[Computer, Switch, Server]:
network = Network()
# Create Computer
computer = Computer(
hostname="computer",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
)
computer_cfg = {
"type": "computer",
"hostname": "computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
computer: Computer = Computer.from_config(config=computer_cfg)
computer.power_on()
# Create Server
server = Server(
hostname="server",
ip_address="192.168.1.3",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
)
server_cfg = {
"type": "server",
"hostname": "server",
"ip_address": "192.168.1.3",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
server: Server = Server.from_config(config=server_cfg)
server.power_on()
switch = Switch(hostname="switch", start_up_duration=0)
# Create Switch
switch: Switch = Switch.from_config(config={"type": "switch", "hostname": "switch", "start_up_duration": 0})
switch.power_on()
network.connect(endpoint_a=computer.network_interface[1], endpoint_b=switch.network_interface[1])
@@ -219,7 +232,7 @@ def example_network() -> Network:
# Router 1
router_1_cfg = {"hostname": "router_1", "type": "router"}
router_1_cfg = {"hostname": "router_1", "type": "router", "start_up_duration":0}
# router_1 = Router(hostname="router_1", start_up_duration=0)
router_1 = Router.from_config(config=router_1_cfg)
@@ -229,7 +242,7 @@ def example_network() -> Network:
# Switch 1
switch_1_cfg = {"hostname": "switch_1", "type": "switch"}
switch_1_cfg = {"hostname": "switch_1", "type": "switch", "start_up_duration": 0}
switch_1 = Switch.from_config(config=switch_1_cfg)
@@ -240,7 +253,7 @@ def example_network() -> Network:
router_1.enable_port(1)
# Switch 2
switch_2_config = {"hostname": "switch_2", "type": "switch", "num_ports": 8}
switch_2_config = {"hostname": "switch_2", "type": "switch", "num_ports": 8, "start_up_duration":0}
# switch_2 = Switch(hostname="switch_2", num_ports=8, start_up_duration=0)
switch_2 = Switch.from_config(config=switch_2_config)
switch_2.power_on()
@@ -348,28 +361,34 @@ def install_stuff_to_sim(sim: Simulation):
# 1: Set up network hardware
# 1.1: Configure the router
router = Router.from_config(config={"type":"router", "hostname":"router", "num_ports":3, "start_up_duration":0})
router = Router.from_config(config={"type": "router", "hostname": "router", "num_ports": 3, "start_up_duration": 0})
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
# 1.2: Create and connect switches
switch_1 = Switch.from_config(config={"type":"switch", "hostname":"switch_1", "num_ports":6, "start_up_duration":0})
switch_1 = Switch.from_config(
config={"type": "switch", "hostname": "switch_1", "num_ports": 6, "start_up_duration": 0}
)
switch_1.power_on()
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
router.enable_port(1)
switch_2 = Switch.from_config(config={"type":"switch", "hostname":"switch_2", "num_ports":6, "start_up_duration":0})
switch_2 = Switch.from_config(
config={"type": "switch", "hostname": "switch_2", "num_ports": 6, "start_up_duration": 0}
)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.enable_port(2)
# 1.3: Create and connect computer
client_1_cfg = {"type": "computer",
"hostname": "client_1",
"ip_address":"10.0.1.2",
"subnet_mask":"255.255.255.0",
"default_gateway": "10.0.1.1",
"start_up_duration":0}
client_1_cfg = {
"type": "computer",
"hostname": "client_1",
"ip_address": "10.0.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "10.0.1.1",
"start_up_duration": 0,
}
client_1: Computer = Computer.from_config(config=client_1_cfg)
client_1.power_on()
network.connect(
@@ -378,24 +397,26 @@ def install_stuff_to_sim(sim: Simulation):
)
# 1.4: Create and connect servers
server_1_cfg = {"type": "server",
"hostname":"server_1",
"ip_address": "10.0.2.2",
"subnet_mask":"255.255.255.0",
"default_gateway":"10.0.2.1",
"start_up_duration": 0}
server_1_cfg = {
"type": "server",
"hostname": "server_1",
"ip_address": "10.0.2.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "10.0.2.1",
"start_up_duration": 0,
}
server_1: Server = Server.from_config(config=server_1_cfg)
server_1.power_on()
network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1])
server_2_cfg = {"type": "server",
"hostname":"server_2",
"ip_address": "10.0.2.3",
"subnet_mask":"255.255.255.0",
"default_gateway":"10.0.2.1",
"start_up_duration": 0}
server_2_cfg = {
"type": "server",
"hostname": "server_2",
"ip_address": "10.0.2.3",
"subnet_mask": "255.255.255.0",
"default_gateway": "10.0.2.1",
"start_up_duration": 0,
}
server_2: Server = Server.from_config(config=server_2_cfg)
server_2.power_on()

View File

@@ -12,12 +12,18 @@ def test_passing_actions_down(monkeypatch) -> None:
sim = Simulation()
pc1 = Computer.from_config(config={"type":"computer", "hostname":"PC-1", "ip_address":"10.10.1.1", "subnet_mask":"255.255.255.0"})
pc1 = Computer.from_config(
config={"type": "computer", "hostname": "PC-1", "ip_address": "10.10.1.1", "subnet_mask": "255.255.255.0"}
)
pc1.start_up_duration = 0
pc1.power_on()
pc2 = Computer.from_config(config={"type":"computer", "hostname":"PC-2", "ip_address":"10.10.1.2", "subnet_mask":"255.255.255.0"})
srv = Server.from_config(config={"type":"server", "hostname":"WEBSERVER", "ip_address":"10.10.1.100", "subnet_mask":"255.255.255.0"})
s1 = Switch.from_config(config={"type":"switch", "hostname":"switch1"})
pc2 = Computer.from_config(
config={"type": "computer", "hostname": "PC-2", "ip_address": "10.10.1.2", "subnet_mask": "255.255.255.0"}
)
srv = Server.from_config(
config={"type": "server", "hostname": "WEBSERVER", "ip_address": "10.10.1.100", "subnet_mask": "255.255.255.0"}
)
s1 = Switch.from_config(config={"type": "switch", "hostname": "switch1"})
for n in [pc1, pc2, srv, s1]:
sim.network.add_node(n)

View File

@@ -5,8 +5,8 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests.integration_tests.configuration_file_parsing import DMZ_NETWORK, load_config

View File

@@ -3,6 +3,8 @@ from primaite.config.load import data_manipulation_config_path
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from tests.integration_tests.configuration_file_parsing import BASIC_CONFIG, DMZ_NETWORK, load_config

View File

@@ -4,6 +4,7 @@ import yaml
from primaite.session.environment import PrimaiteGymEnv
from primaite.session.ray_envs import PrimaiteRayEnv, PrimaiteRayMARLEnv
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from tests.conftest import TEST_ASSETS_ROOT
folder_path = TEST_ASSETS_ROOT / "configs" / "scenario_with_placeholders"

View File

@@ -35,7 +35,7 @@ def test_node_startup_shutdown(game_and_agent_fixture: Tuple[PrimaiteGame, Proxy
assert client_1.operating_state == NodeOperatingState.SHUTTING_DOWN
for i in range(client_1.shut_down_duration + 1):
for i in range(client_1.config.shut_down_duration + 1):
action = ("do_nothing", {})
agent.store_action(action)
game.step()
@@ -49,7 +49,7 @@ def test_node_startup_shutdown(game_and_agent_fixture: Tuple[PrimaiteGame, Proxy
assert client_1.operating_state == NodeOperatingState.BOOTING
for i in range(client_1.start_up_duration + 1):
for i in range(client_1.config.start_up_duration + 1):
action = ("do_nothing", {})
agent.store_action(action)
game.step()
@@ -79,7 +79,7 @@ def test_node_cannot_be_shut_down_if_node_is_already_off(game_and_agent_fixture:
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.power_off()
for i in range(client_1.shut_down_duration + 1):
for i in range(client_1.config.shut_down_duration + 1):
action = ("do_nothing", {})
agent.store_action(action)
game.step()

View File

@@ -117,7 +117,9 @@ def test_firewall_observation():
assert all(observation["PORTS"][i]["operating_status"] == 2 for i in range(1, 4))
# connect a switch to the firewall and check that only the correct port is updated
switch: Switch = Switch.from_config(config={"type": "switch", "hostname":"switch", "num_ports":1, "operating_state":NodeOperatingState.ON})
switch: Switch = Switch.from_config(
config={"type": "switch", "hostname": "switch", "num_ports": 1, "operating_state": NodeOperatingState.ON}
)
link = net.connect(firewall.network_interface[1], switch.network_interface[1])
assert firewall.network_interface[1].enabled
observation = firewall_observation.observe(firewall.describe_state())

View File

@@ -56,12 +56,26 @@ def test_link_observation():
"""Check the shape and contents of the link observation."""
net = Network()
sim = Simulation(network=net)
switch: Switch = Switch.from_config(config={"type":"switch", "hostname":"switch", "num_ports":5, "operating_state":NodeOperatingState.ON})
computer_1: Computer = Computer.from_config(config={"type": "computer",
"hostname":"computer_1", "ip_address":"10.0.0.1", "subnet_mask":"255.255.255.0", "start_up_duration":0}
switch: Switch = Switch.from_config(
config={"type": "switch", "hostname": "switch", "num_ports": 5, "operating_state": NodeOperatingState.ON}
)
computer_2: Computer = Computer.from_config(config={"type":"computer",
"hostname":"computer_2", "ip_address":"10.0.0.2", "subnet_mask":"255.255.255.0", "start_up_duration":0}
computer_1: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "computer_1",
"ip_address": "10.0.0.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
computer_2: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "computer_2",
"ip_address": "10.0.0.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
computer_1.power_on()
computer_2.power_on()

View File

@@ -163,7 +163,9 @@ def test_nic_monitored_traffic(simulation):
pc2: Computer = simulation.network.get_node_by_hostname("client_2")
nic_obs = NICObservation(
where=["network", "nodes", pc.config.hostname, "NICs", 1], include_nmne=False, monitored_traffic=monitored_traffic
where=["network", "nodes", pc.config.hostname, "NICs", 1],
include_nmne=False,
monitored_traffic=monitored_traffic,
)
simulation.pre_timestep(0) # apply timestep to whole sim

View File

@@ -16,7 +16,9 @@ from primaite.utils.validation.port import PORT_LOOKUP
def test_router_observation():
"""Test adding/removing acl rules and enabling/disabling ports."""
net = Network()
router = Router.from_config(config={"type": "router", "hostname":"router", "num_ports":5, "operating_state":NodeOperatingState.ON})
router = Router.from_config(
config={"type": "router", "hostname": "router", "num_ports": 5, "operating_state": NodeOperatingState.ON}
)
ports = [PortObservation(where=["NICs", i]) for i in range(1, 6)]
acl = ACLObservation(
@@ -89,7 +91,9 @@ def test_router_observation():
assert all(observed_output["PORTS"][i]["operating_status"] == 2 for i in range(1, 6))
# connect a switch to the router and check that only the correct port is updated
switch: Switch = Switch.from_config(config={"type": "switch", "hostname":"switch", "num_ports":1, "operating_state":NodeOperatingState.ON})
switch: Switch = Switch.from_config(
config={"type": "switch", "hostname": "switch", "num_ports": 1, "operating_state": NodeOperatingState.ON}
)
link = net.connect(router.network_interface[1], switch.network_interface[1])
assert router.network_interface[1].enabled
observed_output = router_observation.observe(router.describe_state())

View File

@@ -2,8 +2,8 @@
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.system.services.service import ServiceOperatingState
from tests.conftest import TEST_ASSETS_ROOT
CFG_PATH = TEST_ASSETS_ROOT / "configs/test_primaite_session.yaml"

View File

@@ -17,11 +17,11 @@ from typing import Tuple
import pytest
import yaml
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.software import SoftwareHealthState

View File

@@ -8,14 +8,17 @@ from primaite.simulator.sim_container import Simulation
def test_file_observation():
sim = Simulation()
pc = Computer(hostname="beep", ip_address="123.123.123.123", subnet_mask="255.255.255.0")
pc: Computer = Computer.from_config(config={"type":"computer",
"hostname":"beep",
"ip_address":"123.123.123.123",
"subnet_mask":"255.255.255.0"})
sim.network.add_node(pc)
f = pc.file_system.create_file(file_name="dog.png")
state = sim.describe_state()
dog_file_obs = FileObservation(
where=["network", "nodes", pc.hostname, "file_system", "folders", "root", "files", "dog.png"],
where=["network", "nodes", pc.config.hostname, "file_system", "folders", "root", "files", "dog.png"],
include_num_access=False,
file_system_requires_scan=False,
)

View File

@@ -84,24 +84,28 @@ class BroadcastTestClient(Application, identifier="BroadcastTestClient"):
def broadcast_network() -> Network:
network = Network()
client_1_cfg = {"type": "computer",
"hostname": "client_1",
"ip_address":"192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration":0}
client_1_cfg = {
"type": "computer",
"hostname": "client_1",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
client_1: Computer = Computer.from_config(config=client_1_cfg)
client_1.power_on()
client_1.software_manager.install(BroadcastTestClient)
application_1 = client_1.software_manager.software["BroadcastTestClient"]
application_1.run()
client_2_cfg = {"type": "computer",
"hostname": "client_2",
"ip_address":"192.168.1.3",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration":0}
client_2_cfg = {
"type": "computer",
"hostname": "client_2",
"ip_address": "192.168.1.3",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
client_2: Computer = Computer.from_config(config=client_2_cfg)
client_2.power_on()
@@ -109,14 +113,16 @@ def broadcast_network() -> Network:
application_2 = client_2.software_manager.software["BroadcastTestClient"]
application_2.run()
server_1_cfg = {"type": "server",
"hostname": "server_1",
"ip_address":"192.168.1.1",
"subnet_mask": "255.255.255.0",
"default_gateway":"192.168.1.1",
"start_up_duration": 0}
server_1_cfg = {
"type": "server",
"hostname": "server_1",
"ip_address": "192.168.1.1",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
server_1 :Server = Server.from_config(config=server_1_cfg)
server_1: Server = Server.from_config(config=server_1_cfg)
server_1.power_on()
@@ -124,7 +130,9 @@ def broadcast_network() -> Network:
service: BroadcastTestService = server_1.software_manager.software["BroadcastService"]
service.start()
switch_1: Switch = Switch.from_config(config={"type": "switch", "hostname":"switch_1", "num_ports":6, "start_up_duration":0})
switch_1: Switch = Switch.from_config(
config={"type": "switch", "hostname": "switch_1", "num_ports": 6, "start_up_duration": 0}
)
switch_1.power_on()
network.connect(endpoint_a=client_1.network_interface[1], endpoint_b=switch_1.network_interface[1])

View File

@@ -41,7 +41,9 @@ def dmz_external_internal_network() -> Network:
"""
network = Network()
firewall_node: Firewall = Firewall(hostname="firewall_1", start_up_duration=0)
firewall_node: Firewall = Firewall.from_config(
config={"type": "firewall", "hostname": "firewall_1", "start_up_duration": 0}
)
firewall_node.power_on()
# configure firewall ports
firewall_node.configure_external_port(

View File

@@ -34,54 +34,64 @@ def basic_network() -> Network:
# Creating two generic nodes for the C2 Server and the C2 Beacon.
node_a_cfg = {"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.252",
"default_gateway": "192.168.0.1",
"start_up_duration": 0}
node_a_cfg = {
"type": "computer",
"hostname": "node_a",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.252",
"default_gateway": "192.168.0.1",
"start_up_duration": 0,
}
node_a: Computer = Computer.from_config(config=node_a_cfg)
node_a.power_on()
node_a.software_manager.get_open_ports()
node_a.software_manager.install(software_class=C2Server)
node_b_cfg = {"type": "computer",
"hostname": "node_b",
"ip_address": "192.168.255.2",
"subnet_mask": "255.255.255.248",
"default_gateway": "192.168.255.1",
"start_up_duration": 0}
node_b_cfg = {
"type": "computer",
"hostname": "node_b",
"ip_address": "192.168.255.2",
"subnet_mask": "255.255.255.248",
"default_gateway": "192.168.255.1",
"start_up_duration": 0,
}
node_b: Computer = Computer.from_config(config=node_b_cfg)
node_b.power_on()
node_b.software_manager.install(software_class=C2Beacon)
# Creating a generic computer for testing remote terminal connections.
node_c_cfg = {"type": "computer",
"hostname": "node_c",
"ip_address": "192.168.255.3",
"subnet_mask": "255.255.255.248",
"default_gateway": "192.168.255.1",
"start_up_duration": 0}
node_c_cfg = {
"type": "computer",
"hostname": "node_c",
"ip_address": "192.168.255.3",
"subnet_mask": "255.255.255.248",
"default_gateway": "192.168.255.1",
"start_up_duration": 0,
}
node_c: Computer = Computer.from_config(config=node_c_cfg)
node_c.power_on()
# Creating a router to sit between node 1 and node 2.
router = Router.from_config(config={"type":"router", "hostname":"router", "num_ports":3, "start_up_duration":0})
router = Router.from_config(config={"type": "router", "hostname": "router", "num_ports": 3, "start_up_duration": 0})
# Default allow all.
router.acl.add_rule(action=ACLAction.PERMIT)
router.power_on()
# Creating switches for each client.
switch_1 = Switch.from_config(config={"type":"switch", "hostname":"switch_1", "num_ports":6, "start_up_duration":0})
switch_1 = Switch.from_config(
config={"type": "switch", "hostname": "switch_1", "num_ports": 6, "start_up_duration": 0}
)
switch_1.power_on()
# Connecting the switches to the router.
router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252")
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
switch_2 = Switch.from_config(config={"type":"switch", "hostname":"switch_2", "num_ports":6, "start_up_duration":0})
switch_2 = Switch.from_config(
config={"type": "switch", "hostname": "switch_2", "num_ports": 6, "start_up_duration": 0}
)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])

View File

@@ -72,7 +72,7 @@ def test_dns_client_requests_offline_dns_server(dns_client_and_dns_server):
server.power_off()
for i in range(server.shut_down_duration + 1):
for i in range(server.config.shut_down_duration + 1):
server.apply_timestep(timestep=i)
assert server.operating_state == NodeOperatingState.OFF

View File

@@ -13,13 +13,15 @@ from primaite.simulator.system.services.service import Service, ServiceOperating
def populated_node(
service_class,
) -> Tuple[Server, Service]:
server = Server(
hostname="server",
ip_address="192.168.0.1",
subnet_mask="255.255.255.0",
start_up_duration=0,
shut_down_duration=0,
)
server_cfg = {
"type": "server",
"hostname": "server",
"ip_address": "192.168.0.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
"shut_down_duration": 0,
}
server: Server = Server.from_config(config=server_cfg)
server.power_on()
server.software_manager.install(service_class)
@@ -31,14 +33,16 @@ def populated_node(
def test_service_on_offline_node(service_class):
"""Test to check that the service cannot be interacted with when node it is on is off."""
computer: Computer = Computer(
hostname="test_computer",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0,
shut_down_duration=0,
)
computer_cfg = {
"type": "computer",
"hostname": "test_computer",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
"shut_down_duration": 0,
}
computer: Computer = Computer.from_config(config=computer_cfg)
computer.power_on()
computer.software_manager.install(service_class)

View File

@@ -94,7 +94,7 @@ def test_web_page_request_from_shut_down_server(web_client_and_web_server):
server.power_off()
for i in range(server.shut_down_duration + 1):
for i in range(server.config.shut_down_duration + 1):
server.apply_timestep(timestep=i)
# node should be off

View File

@@ -7,10 +7,7 @@ from primaite.simulator.network.hardware.nodes.network.switch import Switch
@pytest.fixture(scope="function")
def switch() -> Switch:
switch_cfg = {"type": "switch",
"hostname": "switch_1",
"num_ports": 8,
"start_up_duration": 0}
switch_cfg = {"type": "switch", "hostname": "switch_1", "num_ports": 8, "start_up_duration": 0}
switch: Switch = Switch.from_config(config=switch_cfg)
switch.power_on()
switch.show()

View File

@@ -7,10 +7,7 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def node() -> Node:
computer_cfg = {"type": "computer",
"hostname": "test",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0"}
computer_cfg = {"type": "computer", "hostname": "test", "ip_address": "192.168.1.2", "subnet_mask": "255.255.255.0"}
computer = Computer.from_config(config=computer_cfg)
return computer

View File

@@ -12,13 +12,12 @@ from tests.conftest import DummyApplication, DummyService
@pytest.fixture
def node() -> Node:
computer_cfg = {"type": "computer",
"hostname": "test",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"shut_down_duration": 3,
"operating_state": "OFF",
}
computer_cfg = {
"type": "computer",
"hostname": "test",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
}
computer = Computer.from_config(config=computer_cfg)
return computer

View File

@@ -74,7 +74,16 @@ def test_removing_node_that_does_not_exist(network):
"""Node that does not exist on network should not affect existing nodes."""
assert len(network.nodes) is 7
network.remove_node(Computer.from_config(config = {"type":"computer","hostname":"new_node", "ip_address":"192.168.1.2", "subnet_mask":"255.255.255.0"}))
network.remove_node(
Computer.from_config(
config={
"type": "computer",
"hostname": "new_node",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
}
)
)
assert len(network.nodes) is 7

View File

@@ -16,23 +16,25 @@ def basic_c2_network() -> Network:
network = Network()
# Creating two generic nodes for the C2 Server and the C2 Beacon.
computer_a_cfg = {"type": "computer",
"hostname": "computer_a",
"ip_address": "192.168.0.1",
"subnet_mask": "255.255.255.252",
"start_up_duration": 0}
computer_a = Computer.from_config(config = computer_a_cfg)
computer_a_cfg = {
"type": "computer",
"hostname": "computer_a",
"ip_address": "192.168.0.1",
"subnet_mask": "255.255.255.252",
"start_up_duration": 0,
}
computer_a = Computer.from_config(config=computer_a_cfg)
computer_a.power_on()
computer_a.software_manager.install(software_class=C2Server)
computer_b_cfg = {"type": "computer",
"hostname": "computer_b",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.252",
"start_up_duration": 0,
}
computer_b_cfg = {
"type": "computer",
"hostname": "computer_b",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.252",
"start_up_duration": 0,
}
computer_b = Computer.from_config(config=computer_b_cfg)

View File

@@ -12,12 +12,13 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def dos_bot() -> DoSBot:
computer_cfg = {"type":"computer",
"hostname": "compromised_pc",
"ip_address": "192.168.0.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
computer_cfg = {
"type": "computer",
"hostname": "compromised_pc",
"ip_address": "192.168.0.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
computer: Computer = Computer.from_config(config=computer_cfg)
computer.power_on()
@@ -39,7 +40,7 @@ def test_dos_bot_cannot_run_when_node_offline(dos_bot):
dos_bot_node.power_off()
for i in range(dos_bot_node.shut_down_duration + 1):
for i in range(dos_bot_node.config.shut_down_duration + 1):
dos_bot_node.apply_timestep(timestep=i)
assert dos_bot_node.operating_state is NodeOperatingState.OFF

View File

@@ -17,14 +17,28 @@ from primaite.simulator.system.services.database.database_service import Databas
def database_client_on_computer() -> Tuple[DatabaseClient, Computer]:
network = Network()
db_server: Server = Server.from_config(config={"type": "server", "hostname":"db_server", "ip_address":"192.168.0.1", "subnet_mask":"255.255.255.0", "start_up_duration":0})
db_server: Server = Server.from_config(
config={
"type": "server",
"hostname": "db_server",
"ip_address": "192.168.0.1",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
db_server.power_on()
db_server.software_manager.install(DatabaseService)
db_server.software_manager.software["DatabaseService"].start()
db_client: Computer = Computer.from_config(config = {"type":"computer",
"hostname":"db_client", "ip_address":"192.168.0.2", "subnet_mask":"255.255.255.0", "start_up_duration":0
})
db_client: Computer = Computer.from_config(
config={
"type": "computer",
"hostname": "db_client",
"ip_address": "192.168.0.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
)
db_client.power_on()
db_client.software_manager.install(DatabaseClient)

View File

@@ -12,8 +12,15 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def web_browser() -> WebBrowser:
computer_cfg = {"type": "computer", "hostname": "web_client", "ip_address": "192.168.1.11", "subnet_mask": "255.255.255.0", "default_gateway": "192.168.1.1", "start_up_duration": 0}
computer_cfg = {
"type": "computer",
"hostname": "web_client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
computer: Computer = Computer.from_config(config=computer_cfg)
computer.power_on()
@@ -25,8 +32,15 @@ def web_browser() -> WebBrowser:
def test_create_web_client():
computer_cfg = {"type": "computer", "hostname": "web_client", "ip_address": "192.168.1.11", "subnet_mask": "255.255.255.0", "default_gateway": "192.168.1.1", "start_up_duration": 0}
computer_cfg = {
"type": "computer",
"hostname": "web_client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
computer: Computer = Computer.from_config(config=computer_cfg)
computer.power_on()

View File

@@ -8,12 +8,13 @@ from primaite.simulator.system.services.database.database_service import Databas
@pytest.fixture(scope="function")
def database_server() -> Node:
node_cfg = {"type": "computer",
"hostname": "db_node",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
node_cfg = {
"type": "computer",
"hostname": "db_node",
"ip_address": "192.168.1.2",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
node = Computer.from_config(config=node_cfg)
node.power_on()

View File

@@ -14,21 +14,15 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def dns_client() -> Computer:
node_cfg = {"type": "computer",
"hostname": "dns_client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10")}
node_cfg = {
"type": "computer",
"hostname": "dns_client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"dns_server": IPv4Address("192.168.1.10"),
}
node = Computer.from_config(config=node_cfg)
# node = Computer(
# hostname="dns_client",
# ip_address="192.168.1.11",
# subnet_mask="255.255.255.0",
# default_gateway="192.168.1.1",
# dns_server=IPv4Address("192.168.1.10"),
# )
return node
@@ -69,7 +63,7 @@ def test_dns_client_check_domain_exists_when_not_running(dns_client):
dns_client.power_off()
for i in range(dns_client.shut_down_duration + 1):
for i in range(dns_client.config.shut_down_duration + 1):
dns_client.apply_timestep(timestep=i)
assert dns_client.operating_state is NodeOperatingState.OFF

View File

@@ -16,12 +16,14 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def dns_server() -> Node:
node_cfg = {"type": "server",
"hostname": "dns_server",
"ip_address": "192.168.1.10",
"subnet_mask":"255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration":0}
node_cfg = {
"type": "server",
"hostname": "dns_server",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
node = Server.from_config(config=node_cfg)
node.power_on()
node.software_manager.install(software_class=DNSServer)
@@ -55,12 +57,13 @@ def test_dns_server_receive(dns_server):
# register the web server in the domain controller
dns_server_service.dns_register(domain_name="real-domain.com", domain_ip_address=IPv4Address("192.168.1.12"))
client_cfg = {"type": "computer",
"hostname": "client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
client_cfg = {
"type": "computer",
"hostname": "client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"start_up_duration": 0,
}
client = Computer.from_config(config=client_cfg)
client.power_on()
client.dns_server = IPv4Address("192.168.1.10")

View File

@@ -16,13 +16,14 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def ftp_client() -> Node:
node_cfg = {"type": "computer",
"hostname": "ftp_client",
"ip_address":"192.168.1.11",
"subnet_mask":"255.255.255.0",
"default_gateway":"192.168.1.1",
"start_up_duration": 0,
}
node_cfg = {
"type": "computer",
"hostname": "ftp_client",
"ip_address": "192.168.1.11",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
node = Computer.from_config(config=node_cfg)
node.power_on()
return node

View File

@@ -14,12 +14,14 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def ftp_server() -> Node:
node_cfg = {"type": "server",
"hostname":"ftp_server",
"ip_address":"192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration":0}
node_cfg = {
"type": "server",
"hostname": "ftp_server",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
node = Server.from_config(config=node_cfg)
node.power_on()
node.software_manager.install(software_class=FTPServer)

View File

@@ -29,8 +29,8 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def terminal_on_computer() -> Tuple[Terminal, Computer]:
computer: Computer = Computer(
hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0
computer: Computer = Computer.from_config(config={"type":"computer",
"hostname":"node_a", "ip_address":"192.168.0.10", "subnet_mask":"255.255.255.0", "start_up_duration":0}
)
computer.power_on()
terminal: Terminal = computer.software_manager.software.get("Terminal")
@@ -41,11 +41,19 @@ def terminal_on_computer() -> Tuple[Terminal, Computer]:
@pytest.fixture(scope="function")
def basic_network() -> Network:
network = Network()
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
node_a = Computer.from_config(config={"type":"computer",
"hostname":"node_a",
"ip_address":"192.168.0.10",
"subnet_mask":"255.255.255.0",
"start_up_duration":0})
node_a.power_on()
node_a.software_manager.get_open_ports()
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
node_b = Computer.from_config(config={"type":"computer",
"hostname":"node_b",
"ip_address":"192.168.0.11",
"subnet_mask":"255.255.255.0",
"start_up_duration":0})
node_b.power_on()
network.connect(node_a.network_interface[1], node_b.network_interface[1])
@@ -57,18 +65,20 @@ def wireless_wan_network():
network = Network()
# Configure PC A
pc_a = Computer(
hostname="pc_a",
ip_address="192.168.0.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.0.1",
start_up_duration=0,
)
pc_a_cfg = {"type": "computer",
"hostname":"pc_a",
"ip_address":"192.168.0.2",
"subnet_mask":"255.255.255.0",
"default_gateway":"192.168.0.1",
"start_up_duration":0,
}
pc_a = Computer.from_config(config=pc_a_cfg)
pc_a.power_on()
network.add_node(pc_a)
# Configure Router 1
router_1 = WirelessRouter(hostname="router_1", start_up_duration=0, airspace=network.airspace)
router_1 = WirelessRouter.from_config(config={"type":"wireless_router", "hostname":"router_1", "start_up_duration":0, "airspace":network.airspace})
router_1.power_on()
network.add_node(router_1)
@@ -88,18 +98,21 @@ def wireless_wan_network():
)
# Configure PC B
pc_b = Computer(
hostname="pc_b",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1",
start_up_duration=0,
)
pc_b_cfg = {"type": "computer",
"hostname":"pc_b",
"ip_address":"192.168.2.2",
"subnet_mask":"255.255.255.0",
"default_gateway":"192.168.2.1",
"start_up_duration":0,
}
pc_b = Computer.from_config(config=pc_b_cfg)
pc_b.power_on()
network.add_node(pc_b)
# Configure Router 2
router_2 = WirelessRouter(hostname="router_2", start_up_duration=0, airspace=network.airspace)
router_2 = WirelessRouter.from_config(config={"type":"wireless_router", "hostname":"router_2", "start_up_duration":0, "airspace":network.airspace})
router_2.power_on()
network.add_node(router_2)
@@ -131,7 +144,7 @@ def game_and_agent_fixture(game_and_agent):
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
client_1.config.start_up_duration = 3
return game, agent
@@ -143,7 +156,11 @@ def test_terminal_creation(terminal_on_computer):
def test_terminal_install_default():
"""Terminal should be auto installed onto Nodes"""
computer = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
computer: Computer = Computer.from_config(config={"type":"computer",
"hostname":"node_a",
"ip_address":"192.168.0.10",
"subnet_mask":"255.255.255.0",
"start_up_duration":0})
computer.power_on()
assert computer.software_manager.software.get("Terminal")
@@ -151,7 +168,7 @@ def test_terminal_install_default():
def test_terminal_not_on_switch():
"""Ensure terminal does not auto-install to switch"""
test_switch = Switch(hostname="Test")
test_switch = Switch.from_config(config={"type":"switch", "hostname":"Test"})
assert not test_switch.software_manager.software.get("Terminal")
@@ -357,8 +374,6 @@ def test_multiple_remote_terminals_same_node(basic_network):
for attempt in range(3):
remote_connection = terminal_a.login(username="admin", password="admin", ip_address="192.168.0.11")
terminal_a.show()
assert len(terminal_a._connections) == 3

View File

@@ -16,12 +16,14 @@ from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")
def web_server() -> Server:
node_cfg = {"type": "server",
"hostname":"web_server",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway":"192.168.1.1",
"start_up_duration":0 }
node_cfg = {
"type": "server",
"hostname": "web_server",
"ip_address": "192.168.1.10",
"subnet_mask": "255.255.255.0",
"default_gateway": "192.168.1.1",
"start_up_duration": 0,
}
node = Server.from_config(config=node_cfg)
node.power_on()
node.software_manager.install(WebServer)