Merge branch 'dev' into bugfix/2676_NMNE_var_access

This commit is contained in:
Nick Todd
2024-07-15 09:27:11 +01:00
81 changed files with 11491 additions and 645 deletions

View File

@@ -67,7 +67,7 @@ def test_dev_mode_config_sys_log_level():
# check defaults
assert PRIMAITE_CONFIG["developer_mode"]["sys_log_level"] == "DEBUG" # DEBUG by default
result = cli(["dev-mode", "config", "-level", "WARNING"])
result = cli(["dev-mode", "config", "--sys-log-level", "WARNING"])
assert "sys_log_level=WARNING" in result.output # should print correct value
@@ -78,10 +78,30 @@ def test_dev_mode_config_sys_log_level():
assert "sys_log_level=INFO" in result.output # should print correct value
# config should reflect that log level is WARNING
# config should reflect that log level is INFO
assert PRIMAITE_CONFIG["developer_mode"]["sys_log_level"] == "INFO"
def test_dev_mode_config_agent_log_level():
"""Check that the agent log level can be changed via CLI."""
# check defaults
assert PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] == "DEBUG" # DEBUG by default
result = cli(["dev-mode", "config", "-level", "WARNING"])
assert "agent_log_level=WARNING" in result.output # should print correct value
# config should reflect that log level is WARNING
assert PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] == "WARNING"
result = cli(["dev-mode", "config", "--agent-log-level", "INFO"])
assert "agent_log_level=INFO" in result.output # should print correct value
# config should reflect that log level is INFO
assert PRIMAITE_CONFIG["developer_mode"]["agent_log_level"] == "INFO"
def test_dev_mode_config_sys_logs_enable_disable():
"""Test that the system logs output can be enabled or disabled."""
# check defaults
@@ -112,6 +132,36 @@ def test_dev_mode_config_sys_logs_enable_disable():
assert PRIMAITE_CONFIG["developer_mode"]["output_sys_logs"] is False
def test_dev_mode_config_agent_logs_enable_disable():
"""Test that the agent logs output can be enabled or disabled."""
# check defaults
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] is False # False by default
result = cli(["dev-mode", "config", "--output-agent-logs"])
assert "output_agent_logs=True" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"]
result = cli(["dev-mode", "config", "--no-agent-logs"])
assert "output_agent_logs=False" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] is False
result = cli(["dev-mode", "config", "-agent"])
assert "output_agent_logs=True" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"]
result = cli(["dev-mode", "config", "-nagent"])
assert "output_agent_logs=False" in result.output # should print correct value
# config should reflect that output_agent_logs is True
assert PRIMAITE_CONFIG["developer_mode"]["output_agent_logs"] is False
def test_dev_mode_config_pcap_logs_enable_disable():
"""Test that the pcap logs output can be enabled or disabled."""
# check defaults

View File

@@ -35,3 +35,7 @@ def test_io_settings():
assert env.io.settings.save_step_metadata is False
assert env.io.settings.write_sys_log_to_terminal is False # false by default
assert env.io.settings.save_agent_logs is True
assert env.io.settings.agent_log_level is LogLevel.INFO
assert env.io.settings.write_agent_log_to_terminal is True # Set to True by the config file.

View File

@@ -1,35 +1,23 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import copy
from ipaddress import IPv4Address
from pathlib import Path
from typing import Union
import yaml
from primaite.config.load import data_manipulation_config_path
from primaite.game.agent.interface import ProxyAgent
from primaite.game.agent.scripted_agents.data_manipulation_bot import DataManipulationAgent
from primaite.game.agent.scripted_agents.probabilistic_agent import ProbabilisticAgent
from primaite.game.game import APPLICATION_TYPES_MAPPING, PrimaiteGame, SERVICE_TYPES_MAPPING
from primaite.simulator.network.container import Network
from primaite.game.game import PrimaiteGame, SERVICE_TYPES_MAPPING
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import DataManipulationBot
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.simulator.system.services.web_server.web_server import WebServer
from tests import TEST_ASSETS_ROOT
TEST_CONFIG = TEST_ASSETS_ROOT / "configs/software_fix_duration.yaml"
ONE_ITEM_CONFIG = TEST_ASSETS_ROOT / "configs/fix_duration_one_item.yaml"
TestApplications = ["DummyApplication", "BroadcastTestClient"]
def load_config(config_path: Union[str, Path]) -> PrimaiteGame:
"""Returns a PrimaiteGame object which loads the contents of a given yaml path."""
@@ -62,9 +50,12 @@ def test_fix_duration_set_from_config():
assert client_1.software_manager.software.get(service).fixing_duration == 3
# in config - applications take 1 timestep to fix
for applications in APPLICATION_TYPES_MAPPING:
assert client_1.software_manager.software.get(applications) is not None
assert client_1.software_manager.software.get(applications).fixing_duration == 1
# remove test applications from list
applications = set(Application._application_registry) - set(TestApplications)
for application in applications:
assert client_1.software_manager.software.get(application) is not None
assert client_1.software_manager.software.get(application).fixing_duration == 1
def test_fix_duration_for_one_item():
@@ -80,8 +71,9 @@ def test_fix_duration_for_one_item():
assert client_1.software_manager.software.get(service).fixing_duration == 2
# in config - applications take 1 timestep to fix
applications = copy.copy(APPLICATION_TYPES_MAPPING)
applications.pop("DatabaseClient")
# remove test applications from list
applications = set(Application._application_registry) - set(TestApplications)
applications.remove("DatabaseClient")
for applications in applications:
assert client_1.software_manager.software.get(applications) is not None
assert client_1.software_manager.software.get(applications).fixing_duration == 2

View File

@@ -0,0 +1,54 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.service import ServiceOperatingState
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_application_cannot_perform_actions_unless_running(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test the the request permissions prevent any actions unless application is running."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
browser: WebBrowser = client_1.software_manager.software.get("WebBrowser")
browser.close()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_SCAN", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_CLOSE", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_FIX", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
action = ("NODE_APPLICATION_EXECUTE", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED

View File

@@ -99,7 +99,7 @@ class TestConfigureDatabaseAction:
game.step()
assert db_client.server_ip_address == old_ip
assert db_client.server_password is "admin123"
assert db_client.server_password == "admin123"
class TestConfigureRansomwareScriptAction:

View File

@@ -0,0 +1,159 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import uuid
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_create_file(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a files to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
random_folder = str(uuid.uuid4())
random_file = str(uuid.uuid4())
assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is None
action = (
"NODE_FILE_CREATE",
{"node_id": 0, "folder_name": random_folder, "file_name": random_file},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is not None
def test_file_delete_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be deleted."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.deleted is False
action = (
"NODE_FILE_DELETE",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.deleted
def test_file_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be scanned."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FILE_SCAN",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_repair_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FILE_REPAIR",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_restore_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be restored."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FILE_RESTORE",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_corrupt_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be corrupted."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FILE_CORRUPT",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_access_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a file to be accessed."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.num_access == 0
action = (
"NODE_FILE_ACCESS",
{"node_id": 0, "folder_name": file.folder_name, "file_name": file.name},
)
agent.store_action(action)
game.step()
assert file.num_access == 1

View File

@@ -0,0 +1,123 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import uuid
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_create_folder(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
random_folder = str(uuid.uuid4())
assert client_1.file_system.get_folder(folder_name=random_folder) is None
action = (
"NODE_FOLDER_CREATE",
{
"node_id": 0,
"folder_name": random_folder,
},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_folder(folder_name=random_folder) is not None
def test_folder_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before scanning."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FOLDER_SCAN",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
for i in range(folder.scan_duration + 1):
game.step()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_repair_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before repairing."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FOLDER_REPAIR",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_restore_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before restoring."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FOLDER_RESTORE",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
assert folder.health_status == FileSystemItemHealthStatus.RESTORING

View File

@@ -0,0 +1,95 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_nic_cannot_be_turned_off_if_not_on(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a NIC cannot be disabled if it is not enabled."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
nic = client_1.network_interface[1]
nic.disable()
assert nic.enabled is False
action = (
"HOST_NIC_DISABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled is False
def test_nic_cannot_be_turned_on_if_already_on(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a NIC cannot be enabled if it is already enabled."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
nic = client_1.network_interface[1]
assert nic.enabled
action = (
"HOST_NIC_ENABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled
def test_that_a_nic_can_be_enabled_and_disabled(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Tests that a NIC can be enabled and disabled."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
nic = client_1.network_interface[1]
assert nic.enabled
action = (
"HOST_NIC_DISABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled is False
action = (
"HOST_NIC_ENABLE",
{
"node_id": 0, # client_1
"nic_id": 0, # the only nic (eth-1)
},
)
agent.store_action(action)
game.step()
assert nic.enabled

View File

@@ -0,0 +1,94 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_node_startup_shutdown(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the node can be shut down and started up."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert client_1.operating_state == NodeOperatingState.ON
# turn it off
action = ("NODE_SHUTDOWN", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.SHUTTING_DOWN
for i in range(client_1.shut_down_duration + 1):
action = ("DONOTHING", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.OFF
# turn it on
action = ("NODE_STARTUP", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.BOOTING
for i in range(client_1.start_up_duration + 1):
action = ("DONOTHING", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.ON
def test_node_cannot_be_started_up_if_node_is_already_on(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a node cannot be started up if it is already on."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert client_1.operating_state == NodeOperatingState.ON
# turn it on
action = ("NODE_STARTUP", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.ON
def test_node_cannot_be_shut_down_if_node_is_already_off(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that a node cannot be shut down if it is already off."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.power_off()
for i in range(client_1.shut_down_duration + 1):
action = ("DONOTHING", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.OFF
# turn it ff
action = ("NODE_SHUTDOWN", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.operating_state == NodeOperatingState.OFF

View File

@@ -0,0 +1,106 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.system.services.service import ServiceOperatingState
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_service_start(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator makes sure that the service is stopped before starting the service."""
game, agent = game_and_agent_fixture
server_1: Server = game.simulation.network.get_node_by_hostname("server_1")
dns_server = server_1.software_manager.software.get("DNSServer")
dns_server.pause()
assert dns_server.operating_state == ServiceOperatingState.PAUSED
action = ("NODE_SERVICE_START", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.PAUSED
dns_server.stop()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_START", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.RUNNING
def test_service_resume(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator checks if the service is paused before resuming."""
game, agent = game_and_agent_fixture
server_1: Server = game.simulation.network.get_node_by_hostname("server_1")
dns_server = server_1.software_manager.software.get("DNSServer")
action = ("NODE_SERVICE_RESUME", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.RUNNING
dns_server.pause()
assert dns_server.operating_state == ServiceOperatingState.PAUSED
action = ("NODE_SERVICE_RESUME", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.RUNNING
def test_service_cannot_perform_actions_unless_running(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the service cannot perform certain actions while not running."""
game, agent = game_and_agent_fixture
server_1: Server = game.simulation.network.get_node_by_hostname("server_1")
dns_server = server_1.software_manager.software.get("DNSServer")
dns_server.stop()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_SCAN", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_PAUSE", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_RESUME", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_RESTART", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED
action = ("NODE_SERVICE_FIX", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert dns_server.operating_state == ServiceOperatingState.STOPPED

View File

@@ -155,7 +155,7 @@ def test_nic_monitored_traffic(simulation):
assert traffic_obs["icmp"]["outbound"] == 0
# send a ping
pc.ping(target_ip_address=pc2.network_interface[1].ip_address)
assert pc.ping(target_ip_address=pc2.network_interface[1].ip_address)
traffic_obs = nic_obs.observe(simulation.describe_state()).get("TRAFFIC")
assert traffic_obs["icmp"]["inbound"] == 1
@@ -178,7 +178,7 @@ def test_nic_monitored_traffic(simulation):
traffic_obs = nic_obs.observe(simulation.describe_state()).get("TRAFFIC")
assert traffic_obs["icmp"]["inbound"] == 0
assert traffic_obs["icmp"]["outbound"] == 0
assert traffic_obs["tcp"][53]["inbound"] == 0
assert traffic_obs["tcp"][53]["inbound"] == 1
assert traffic_obs["tcp"][53]["outbound"] == 1 # getting a webpage sent a dns request out
simulation.pre_timestep(2) # apply timestep to whole sim

View File

@@ -0,0 +1,161 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.system.services.service import ServiceOperatingState
from tests.conftest import TEST_ASSETS_ROOT
CFG_PATH = TEST_ASSETS_ROOT / "configs/test_primaite_session.yaml"
def test_mask_contents_correct():
env = PrimaiteGymEnv(CFG_PATH)
game = env.game
sim = game.simulation
net = sim.network
mask = game.action_mask("defender")
agent = env.agent
node_list = agent.action_manager.node_names
action_map = agent.action_manager.action_map
# CHECK NIC ENABLE/DISABLE ACTIONS
for action_num, action in action_map.items():
mask = game.action_mask("defender")
act_type, act_params = action
if act_type == "NODE_NIC_ENABLE":
node_name = node_list[act_params["node_id"]]
node_obj = net.get_node_by_hostname(node_name)
nic_obj = node_obj.network_interface[act_params["nic_id"] + 1]
assert nic_obj.enabled
assert not mask[action_num]
nic_obj.disable()
mask = game.action_mask("defender")
assert mask[action_num]
nic_obj.enable()
if act_type == "NODE_NIC_DISABLE":
node_name = node_list[act_params["node_id"]]
node_obj = net.get_node_by_hostname(node_name)
nic_obj = node_obj.network_interface[act_params["nic_id"] + 1]
assert nic_obj.enabled
assert mask[action_num]
nic_obj.disable()
mask = game.action_mask("defender")
assert not mask[action_num]
nic_obj.enable()
if act_type == "ROUTER_ACL_ADDRULE":
assert mask[action_num]
if act_type == "ROUTER_ACL_REMOVERULE":
assert mask[action_num]
if act_type == "NODE_RESET":
node_name = node_list[act_params["node_id"]]
node_obj = net.get_node_by_hostname(node_name)
assert node_obj.operating_state is NodeOperatingState.ON
assert mask[action_num]
node_obj.operating_state = NodeOperatingState.OFF
mask = game.action_mask("defender")
assert not mask[action_num]
node_obj.operating_state = NodeOperatingState.ON
if act_type == "NODE_SHUTDOWN":
node_name = node_list[act_params["node_id"]]
node_obj = net.get_node_by_hostname(node_name)
assert node_obj.operating_state is NodeOperatingState.ON
assert mask[action_num]
node_obj.operating_state = NodeOperatingState.OFF
mask = game.action_mask("defender")
assert not mask[action_num]
node_obj.operating_state = NodeOperatingState.ON
if act_type == "NODE_OS_SCAN":
node_name = node_list[act_params["node_id"]]
node_obj = net.get_node_by_hostname(node_name)
assert node_obj.operating_state is NodeOperatingState.ON
assert mask[action_num]
node_obj.operating_state = NodeOperatingState.OFF
mask = game.action_mask("defender")
assert not mask[action_num]
node_obj.operating_state = NodeOperatingState.ON
if act_type == "NODE_STARTUP":
node_name = node_list[act_params["node_id"]]
node_obj = net.get_node_by_hostname(node_name)
assert node_obj.operating_state is NodeOperatingState.ON
assert not mask[action_num]
node_obj.operating_state = NodeOperatingState.OFF
mask = game.action_mask("defender")
assert mask[action_num]
node_obj.operating_state = NodeOperatingState.ON
if act_type == "DONOTHING":
assert mask[action_num]
if act_type == "NODE_SERVICE_DISABLE":
assert mask[action_num]
if act_type in ["NODE_SERVICE_SCAN", "NODE_SERVICE_STOP", "NODE_SERVICE_PAUSE"]:
node_name = node_list[act_params["node_id"]]
service_name = agent.action_manager.service_names[act_params["node_id"]][act_params["service_id"]]
node_obj = net.get_node_by_hostname(node_name)
service_obj = node_obj.software_manager.software.get(service_name)
assert service_obj.operating_state is ServiceOperatingState.RUNNING
assert mask[action_num]
service_obj.operating_state = ServiceOperatingState.DISABLED
mask = game.action_mask("defender")
assert not mask[action_num]
service_obj.operating_state = ServiceOperatingState.RUNNING
if act_type == "NODE_SERVICE_RESUME":
node_name = node_list[act_params["node_id"]]
service_name = agent.action_manager.service_names[act_params["node_id"]][act_params["service_id"]]
node_obj = net.get_node_by_hostname(node_name)
service_obj = node_obj.software_manager.software.get(service_name)
assert service_obj.operating_state is ServiceOperatingState.RUNNING
assert not mask[action_num]
service_obj.operating_state = ServiceOperatingState.PAUSED
mask = game.action_mask("defender")
assert mask[action_num]
service_obj.operating_state = ServiceOperatingState.RUNNING
if act_type == "NODE_SERVICE_START":
node_name = node_list[act_params["node_id"]]
service_name = agent.action_manager.service_names[act_params["node_id"]][act_params["service_id"]]
node_obj = net.get_node_by_hostname(node_name)
service_obj = node_obj.software_manager.software.get(service_name)
assert service_obj.operating_state is ServiceOperatingState.RUNNING
assert not mask[action_num]
service_obj.operating_state = ServiceOperatingState.STOPPED
mask = game.action_mask("defender")
assert mask[action_num]
service_obj.operating_state = ServiceOperatingState.RUNNING
if act_type == "NODE_SERVICE_ENABLE":
node_name = node_list[act_params["node_id"]]
service_name = agent.action_manager.service_names[act_params["node_id"]][act_params["service_id"]]
node_obj = net.get_node_by_hostname(node_name)
service_obj = node_obj.software_manager.software.get(service_name)
assert service_obj.operating_state is ServiceOperatingState.RUNNING
assert not mask[action_num]
service_obj.operating_state = ServiceOperatingState.DISABLED
mask = game.action_mask("defender")
assert mask[action_num]
service_obj.operating_state = ServiceOperatingState.RUNNING
if act_type in ["NODE_FILE_SCAN", "NODE_FILE_CHECKHASH", "NODE_FILE_DELETE"]:
node_name = node_list[act_params["node_id"]]
folder_name = agent.action_manager.get_folder_name_by_idx(act_params["node_id"], act_params["folder_id"])
file_name = agent.action_manager.get_file_name_by_idx(
act_params["node_id"], act_params["folder_id"], act_params["file_id"]
)
node_obj = net.get_node_by_hostname(node_name)
file_obj = node_obj.file_system.get_file(folder_name, file_name, include_deleted=True)
assert not file_obj.deleted
assert mask[action_num]
service_obj.operating_state = ServiceOperatingState.DISABLED
mask = game.action_mask("defender")
assert mask[action_num]
service_obj.operating_state = ServiceOperatingState.RUNNING

View File

@@ -0,0 +1,44 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.airspace import AirSpaceFrequency
from tests import TEST_ASSETS_ROOT
def test_override_freq_max_capacity_mbps():
config_path = TEST_ASSETS_ROOT / "configs" / "wireless_wan_network_config_freq_max_override.yaml"
with open(config_path, "r") as f:
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency.WIFI_2_4) == 123.45
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency.WIFI_5) == 0.0
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")
assert pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should be able to ping PC B"
assert pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should be able to ping PC A"
network.airspace.show()
def test_override_freq_max_capacity_mbps_blocked():
config_path = TEST_ASSETS_ROOT / "configs" / "wireless_wan_network_config_freq_max_override_blocked.yaml"
with open(config_path, "r") as f:
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency.WIFI_2_4) == 0.0
assert network.airspace.get_frequency_max_capacity_mbps(AirSpaceFrequency.WIFI_5) == 0.0
pc_a = network.get_node_by_hostname("pc_a")
pc_b = network.get_node_by_hostname("pc_b")
assert not pc_a.ping(pc_b.network_interface[1].ip_address), "PC A should not be able to ping PC B"
assert not pc_b.ping(pc_a.network_interface[1].ip_address), "PC B should not be able to ping PC A"
network.airspace.show()

View File

@@ -0,0 +1,114 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from tests.integration_tests.network.test_wireless_router import wireless_wan_network
from tests.integration_tests.system.test_ftp_client_server import ftp_client_and_ftp_server
def test_wireless_link_loading(wireless_wan_network):
client, server, router_1, router_2 = wireless_wan_network
# Configure Router 1 ACLs
router_1.acl.add_rule(action=ACLAction.PERMIT, position=1)
# Configure Router 2 ACLs
router_2.acl.add_rule(action=ACLAction.PERMIT, position=1)
airspace = router_1.airspace
client.software_manager.install(FTPClient)
ftp_client: FTPClient = client.software_manager.software.get("FTPClient")
ftp_client.start()
server.software_manager.install(FTPServer)
ftp_server: FTPServer = server.software_manager.software.get("FTPServer")
ftp_server.start()
client.file_system.create_file(file_name="mixtape", size=10 * 10**6, file_type=FileType.MP3, folder_name="music")
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape.mp3",
dest_folder_name="music",
)
# Reset the physical links between the host nodes and the routers
client.network_interface[1]._connected_link.pre_timestep(1)
server.network_interface[1]._connected_link.pre_timestep(1)
assert not ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape3.mp3",
dest_folder_name="music",
)
# Reset the physical links between the host nodes and the routers
client.network_interface[1]._connected_link.pre_timestep(1)
server.network_interface[1]._connected_link.pre_timestep(1)
airspace.reset_bandwidth_load()
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape3.mp3",
dest_folder_name="music",
)
def test_wired_link_loading(ftp_client_and_ftp_server):
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
link = computer.network_interface[1]._connected_link # noqa
assert link.is_up
link.pre_timestep(1)
computer.file_system.create_file(
file_name="mixtape", size=10 * 10**6, file_type=FileType.MP3, folder_name="music"
)
link_load = link.current_load
assert link_load == 0.0
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape.mp3",
dest_folder_name="music",
)
new_link_load = link.current_load
assert new_link_load > link_load
assert not ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape1.mp3",
dest_folder_name="music",
)
link.pre_timestep(2)
link_load = link.current_load
assert link_load == 0.0
assert ftp_client.send_file(
src_file_name="mixtape.mp3",
src_folder_name="music",
dest_ip_address=server.network_interface[1].ip_address,
dest_file_name="mixtape1.mp3",
dest_folder_name="music",
)
new_link_load = link.current_load
assert new_link_load > link_load

View File

@@ -14,7 +14,7 @@ from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.services.service import Service
class BroadcastService(Service):
class BroadcastTestService(Service):
"""A service for sending broadcast and unicast messages over a network."""
def __init__(self, **kwargs):
@@ -41,14 +41,14 @@ class BroadcastService(Service):
super().send(payload="broadcast", dest_ip_address=ip_network, dest_port=Port.HTTP, ip_protocol=self.protocol)
class BroadcastClient(Application, identifier="BroadcastClient"):
class BroadcastTestClient(Application, identifier="BroadcastTestClient"):
"""A client application to receive broadcast and unicast messages."""
payloads_received: List = []
def __init__(self, **kwargs):
# Set default client properties
kwargs["name"] = "BroadcastClient"
kwargs["name"] = "BroadcastTestClient"
kwargs["port"] = Port.HTTP
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
@@ -75,8 +75,8 @@ def broadcast_network() -> Network:
start_up_duration=0,
)
client_1.power_on()
client_1.software_manager.install(BroadcastClient)
application_1 = client_1.software_manager.software["BroadcastClient"]
client_1.software_manager.install(BroadcastTestClient)
application_1 = client_1.software_manager.software["BroadcastTestClient"]
application_1.run()
client_2 = Computer(
@@ -87,8 +87,8 @@ def broadcast_network() -> Network:
start_up_duration=0,
)
client_2.power_on()
client_2.software_manager.install(BroadcastClient)
application_2 = client_2.software_manager.software["BroadcastClient"]
client_2.software_manager.install(BroadcastTestClient)
application_2 = client_2.software_manager.software["BroadcastTestClient"]
application_2.run()
server_1 = Server(
@@ -100,8 +100,8 @@ def broadcast_network() -> Network:
)
server_1.power_on()
server_1.software_manager.install(BroadcastService)
service: BroadcastService = server_1.software_manager.software["BroadcastService"]
server_1.software_manager.install(BroadcastTestService)
service: BroadcastTestService = server_1.software_manager.software["BroadcastService"]
service.start()
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
@@ -115,14 +115,16 @@ def broadcast_network() -> Network:
@pytest.fixture(scope="function")
def broadcast_service_and_clients(broadcast_network) -> Tuple[BroadcastService, BroadcastClient, BroadcastClient]:
client_1: BroadcastClient = broadcast_network.get_node_by_hostname("client_1").software_manager.software[
"BroadcastClient"
def broadcast_service_and_clients(
broadcast_network,
) -> Tuple[BroadcastTestService, BroadcastTestClient, BroadcastTestClient]:
client_1: BroadcastTestClient = broadcast_network.get_node_by_hostname("client_1").software_manager.software[
"BroadcastTestClient"
]
client_2: BroadcastClient = broadcast_network.get_node_by_hostname("client_2").software_manager.software[
"BroadcastClient"
client_2: BroadcastTestClient = broadcast_network.get_node_by_hostname("client_2").software_manager.software[
"BroadcastTestClient"
]
service: BroadcastService = broadcast_network.get_node_by_hostname("server_1").software_manager.software[
service: BroadcastTestService = broadcast_network.get_node_by_hostname("server_1").software_manager.software[
"BroadcastService"
]

View File

@@ -101,6 +101,7 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
actual_result = client_1_nmap.port_scan(
target_ip_address=IPv4Network("192.168.10.0/24"),
target_port=[Port.ARP, Port.HTTP, Port.FTP, Port.DNS, Port.NTP],
)
expected_result = {