Fix airspace and remaining port problems from refactor

This commit is contained in:
Marek Wolan
2024-09-19 15:06:29 +01:00
parent dd931d900b
commit 08f1cf1fbd
34 changed files with 227 additions and 177 deletions

View File

@@ -1,3 +1,4 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Dict
from prettytable import MARKDOWN, PrettyTable
@@ -27,7 +28,7 @@ class GigaSwitch(NetworkNode, identifier="gigaswitch"):
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def __init__(self, **kwargs):
print('--- Extended Component: GigaSwitch ---')
print("--- Extended Component: GigaSwitch ---")
super().__init__(**kwargs)
for i in range(1, self.num_ports + 1):
self.connect_nic(SwitchPort())

View File

@@ -1,7 +1,7 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import ClassVar, Dict
from primaite.simulator.network.hardware.nodes.host.host_node import NIC, HostNode
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode, NIC
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.utils.validators import IPV4Address
@@ -37,7 +37,7 @@ class SuperComputer(HostNode, identifier="supercomputer"):
SYSTEM_SOFTWARE: ClassVar[Dict] = {**HostNode.SYSTEM_SOFTWARE, "FTPClient": FTPClient}
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
print('--- Extended Component: SuperComputer ---')
print("--- Extended Component: SuperComputer ---")
super().__init__(ip_address=ip_address, subnet_mask=subnet_mask, **kwargs)
pass

View File

@@ -17,7 +17,7 @@ from primaite.simulator.system.software import SoftwareHealthState
_LOGGER = getLogger(__name__)
class ExtendedService(Service, identifier='extendedservice'):
class ExtendedService(Service, identifier="extendedservice"):
"""
A copy of DatabaseService that uses the extension framework instead of being part of PrimAITE.
@@ -42,7 +42,7 @@ class ExtendedService(Service, identifier='extendedservice'):
kwargs["protocol"] = IPProtocol["TCP"]
super().__init__(**kwargs)
self._create_db_file()
if kwargs.get('options'):
if kwargs.get("options"):
opt = kwargs["options"]
self.password = opt.get("db_password", None)
if "backup_server_ip" in opt:
@@ -139,7 +139,9 @@ class ExtendedService(Service, identifier='extendedservice'):
old_visible_state = SoftwareHealthState.GOOD
# get db file regardless of whether or not it was deleted
db_file = self.file_system.get_file(folder_name="database", file_name="extended_service_database.db", include_deleted=True)
db_file = self.file_system.get_file(
folder_name="database", file_name="extended_service_database.db", include_deleted=True
)
if db_file is None:
self.sys_log.warning("Database file not initialised.")
@@ -153,7 +155,9 @@ class ExtendedService(Service, identifier='extendedservice'):
self.file_system.delete_file(folder_name="database", file_name="extended_service_database.db")
# replace db file
self.file_system.copy_file(src_folder_name="downloads", src_file_name="extended_service_database.db", dst_folder_name="database")
self.file_system.copy_file(
src_folder_name="downloads", src_file_name="extended_service_database.db", dst_folder_name="database"
)
if self.db_file is None:
self.sys_log.error("Copying database backup failed.")

View File

@@ -1,22 +1,22 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import os
from primaite.config.load import get_extended_config_path
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from tests.integration_tests.configuration_file_parsing import BASIC_CONFIG, DMZ_NETWORK, load_config
import os
from tests.integration_tests.extensions.applications.extended_application import ExtendedApplication
from tests.integration_tests.extensions.nodes.giga_switch import GigaSwitch
# Import the extended components so that PrimAITE registers them
from tests.integration_tests.extensions.nodes.super_computer import SuperComputer
from tests.integration_tests.extensions.nodes.giga_switch import GigaSwitch
from tests.integration_tests.extensions.services.extended_service import ExtendedService
from tests.integration_tests.extensions.applications.extended_application import ExtendedApplication
def test_extended_example_config():
"""Test that the example config can be parsed properly."""
config_path = os.path.join( "tests", "assets", "configs", "extended_config.yaml")
config_path = os.path.join("tests", "assets", "configs", "extended_config.yaml")
game = load_config(config_path)
network: Network = game.simulation.network
@@ -25,8 +25,8 @@ def test_extended_example_config():
assert len(network.router_nodes) == 1 # 1 router in network
assert len(network.switch_nodes) == 1 # 1 switches in network
assert len(network.server_nodes) == 5 # 5 servers in network
assert len(network.extended_hostnodes) == 1 # One extended node based on HostNode
assert len(network.extended_networknodes) == 1 # One extended node based on NetworkNode
assert len(network.extended_hostnodes) == 1 # One extended node based on HostNode
assert len(network.extended_networknodes) == 1 # One extended node based on NetworkNode
assert 'ExtendedApplication' in network.extended_hostnodes[0].software_manager.software
assert 'ExtendedService' in network.extended_hostnodes[0].software_manager.software
assert "ExtendedApplication" in network.extended_hostnodes[0].software_manager.software
assert "ExtendedService" in network.extended_hostnodes[0].software_manager.software

View File

@@ -38,8 +38,8 @@ def test_acl_observations(simulation):
acl_obs = ACLObservation(
where=["network", "nodes", router.hostname, "acl", "acl"],
ip_list=[],
port_list=["NTP", "HTTP", "POSTGRES_SERVER"],
protocol_list=["TCP", "UDP", "ICMP"],
port_list=[123, 80, 5432],
protocol_list=["tcp", "udp", "icmp"],
num_rules=10,
wildcard_list=[],
)

View File

@@ -31,8 +31,8 @@ def test_firewall_observation():
num_rules=7,
ip_list=["10.0.0.1", "10.0.0.2"],
wildcard_list=["0.0.0.255", "0.0.0.1"],
port_list=["HTTP", "DNS"],
protocol_list=["TCP"],
port_list=[80, 53],
protocol_list=["tcp"],
include_users=False,
)

View File

@@ -152,7 +152,12 @@ def test_config_nic_categories(simulation):
def test_nic_monitored_traffic(simulation):
monitored_traffic = {"icmp": ["NONE"], "tcp": [53,]}
monitored_traffic = {
"icmp": ["NONE"],
"tcp": [
53,
],
}
pc: Computer = simulation.network.get_node_by_hostname("client_1")
pc2: Computer = simulation.network.get_node_by_hostname("client_2")

View File

@@ -24,8 +24,8 @@ def test_router_observation():
num_rules=7,
ip_list=["10.0.0.1", "10.0.0.2"],
wildcard_list=["0.0.0.255", "0.0.0.1"],
port_list=["HTTP", "DNS"],
protocol_list=["TCP"],
port_list=[80, 53],
protocol_list=["tcp"],
)
router_observation = RouterObservation(where=[], ports=ports, num_ports=8, acl=acl, include_users=False)

View File

@@ -65,7 +65,9 @@ def test_uc2_rewards(game_and_agent):
db_client.run()
router: Router = game.simulation.network.get_node_by_hostname("router")
router.acl.add_rule(ACLAction.PERMIT, src_port=Port["POSTGRES_SERVER"], dst_port=Port["POSTGRES_SERVER"], position=2)
router.acl.add_rule(
ACLAction.PERMIT, src_port=Port["POSTGRES_SERVER"], dst_port=Port["POSTGRES_SERVER"], position=2
)
comp = GreenAdminDatabaseUnreachablePenalty("client_1")

View File

@@ -2,7 +2,6 @@
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.airspace import AirSpaceFrequency
from tests import TEST_ASSETS_ROOT
@@ -13,8 +12,8 @@ def test_override_freq_max_capacity_mbps():
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency["WIFI_2_4"]) == 123.45
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency["WIFI_5"]) == 0.0
assert network.airspace.get_frequency_max_capacity_mbps("WIFI_2_4") == 123.45
assert network.airspace.get_frequency_max_capacity_mbps("WIFI_5") == 0.0
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")
@@ -32,8 +31,8 @@ def test_override_freq_max_capacity_mbps_blocked():
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency["WIFI_2_4"]) == 0.0
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency["WIFI_5"]) == 0.0
assert network.airspace.get_frequency_max_capacity_mbps("WIFI_2_4") == 0.0
assert network.airspace.get_frequency_max_capacity_mbps("WIFI_5") == 0.0
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")

View File

@@ -73,8 +73,12 @@ def dmz_external_internal_network() -> Network:
firewall_node.external_outbound_acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["ARP"], dst_port=Port["ARP"], position=22
)
firewall_node.dmz_inbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port["ARP"], dst_port=Port["ARP"], position=22)
firewall_node.dmz_outbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port["ARP"], dst_port=Port["ARP"], position=22)
firewall_node.dmz_inbound_acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["ARP"], dst_port=Port["ARP"], position=22
)
firewall_node.dmz_outbound_acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["ARP"], dst_port=Port["ARP"], position=22
)
# external node
external_node = Computer(
@@ -262,8 +266,12 @@ def test_service_allowed_with_rule(dmz_external_internal_network):
assert not internal_ntp_client.time
firewall.internal_outbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port["NTP"], dst_port=Port["NTP"], position=1)
firewall.internal_inbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port["NTP"], dst_port=Port["NTP"], position=1)
firewall.internal_outbound_acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["NTP"], dst_port=Port["NTP"], position=1
)
firewall.internal_inbound_acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["NTP"], dst_port=Port["NTP"], position=1
)
internal_ntp_client.request_time()

View File

@@ -73,7 +73,9 @@ def test_port_scan_one_node_one_port(example_network):
client_2 = network.get_node_by_hostname("client_2")
actual_result = client_1_nmap.port_scan(
target_ip_address=client_2.network_interface[1].ip_address, target_port=Port["DNS"], target_protocol=IPProtocol["TCP"]
target_ip_address=client_2.network_interface[1].ip_address,
target_port=Port["DNS"],
target_protocol=IPProtocol["TCP"],
)
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol["TCP"]: [Port["DNS"]]}}

View File

@@ -66,7 +66,9 @@ def test_nested_dicts():
The expected output should have string values of enums as keys at all levels.
"""
original_dict = {
IPProtocol["UDP"]: {Port["ARP"]: {"inbound": 0, "outbound": 1016.0, "details": {IPProtocol["TCP"]: {"latency": "low"}}}}
IPProtocol["UDP"]: {
Port["ARP"]: {"inbound": 0, "outbound": 1016.0, "details": {IPProtocol["TCP"]: {"latency": "low"}}}
}
}
expected_dict = {"udp": {219: {"inbound": 0, "outbound": 1016.0, "details": {"tcp": {"latency": "low"}}}}}
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
@@ -79,6 +81,9 @@ def test_non_dict_values():
The original dictionary contains lists and tuples as values.
The expected output should preserve these non-dictionary values while converting enum keys to string values.
"""
original_dict = {IPProtocol["UDP"]: [Port["ARP"], Port["HTTP"]], "protocols": (IPProtocol["TCP"], IPProtocol["UDP"])}
original_dict = {
IPProtocol["UDP"]: [Port["ARP"], Port["HTTP"]],
"protocols": (IPProtocol["TCP"], IPProtocol["UDP"]),
}
expected_dict = {"udp": [Port["ARP"], Port["HTTP"]], "protocols": (IPProtocol["TCP"], IPProtocol["UDP"])}
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict