Merge remote-tracking branch 'origin/dev' into 4.0.0-dev

This commit is contained in:
Marek Wolan
2025-02-10 14:39:28 +00:00
parent 0d1edf0362
commit 96549e68aa
71 changed files with 2700 additions and 367 deletions

View File

@@ -1,10 +1,11 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import json
from typing import List
import pytest
import yaml
from primaite.game.agent.observations import ObservationManager
from primaite.game.agent.observations import ApplicationObservation, ObservationManager, ServiceObservation
from primaite.game.agent.observations.file_system_observations import FileObservation, FolderObservation
from primaite.game.agent.observations.host_observations import HostObservation
@@ -136,3 +137,227 @@ class TestFileSystemRequiresScan:
[], files=[], num_files=0, include_num_access=False, file_system_requires_scan=True
)
assert obs_requiring_scan.observe(folder_state)["health_status"] == 1
class TestServicesRequiresScan:
@pytest.mark.parametrize(
("yaml_option_string", "expected_val"),
(
("services_requires_scan: true", True),
("services_requires_scan: false", False),
(" ", True),
),
)
def test_obs_config(self, yaml_option_string, expected_val):
"""Check that the default behaviour is to set service_requires_scan to True."""
obs_cfg_yaml = f"""
type: custom
options:
components:
- type: nodes
label: NODES
options:
hosts:
- hostname: domain_controller
- hostname: web_server
services:
- service_name: web-server
- service_name: dns-client
- hostname: database_server
folders:
- folder_name: database
files:
- file_name: database.db
- hostname: backup_server
services:
- service_name: ftp-server
- hostname: security_suite
- hostname: client_1
- hostname: client_2
num_services: 3
num_applications: 0
num_folders: 1
num_files: 1
num_nics: 2
include_num_access: false
{yaml_option_string}
include_nmne: true
monitored_traffic:
icmp:
- NONE
tcp:
- DNS
routers:
- hostname: router_1
num_ports: 0
ip_list:
- 192.168.1.10
- 192.168.1.12
- 192.168.1.14
- 192.168.1.16
- 192.168.1.110
- 192.168.10.21
- 192.168.10.22
- 192.168.10.110
wildcard_list:
- 0.0.0.1
port_list:
- 80
- 5432
protocol_list:
- ICMP
- TCP
- UDP
num_rules: 10
- type: links
label: LINKS
options:
link_references:
- router_1:eth-1<->switch_1:eth-8
- router_1:eth-2<->switch_2:eth-8
- switch_1:eth-1<->domain_controller:eth-1
- switch_1:eth-2<->web_server:eth-1
- switch_1:eth-3<->database_server:eth-1
- switch_1:eth-4<->backup_server:eth-1
- switch_1:eth-7<->security_suite:eth-1
- switch_2:eth-1<->client_1:eth-1
- switch_2:eth-2<->client_2:eth-1
- switch_2:eth-7<->security_suite:eth-2
- type: none
label: ICS
options: {{}}
"""
cfg = yaml.safe_load(obs_cfg_yaml)
manager = ObservationManager.from_config(cfg)
hosts: List[HostObservation] = manager.obs.components["NODES"].hosts
for i, host in enumerate(hosts):
services: List[ServiceObservation] = host.services
for j, service in enumerate(services):
val = service.services_requires_scan
print(f"host {i} service {j} {val}")
assert val == expected_val # Make sure services require scan by default
def test_services_requires_scan(self):
state = {"health_state_actual": 3, "health_state_visible": 1, "operating_state": 1}
obs_requiring_scan = ServiceObservation([], services_requires_scan=True)
assert obs_requiring_scan.observe(state)["health_status"] == 1 # should be visible value
obs_not_requiring_scan = ServiceObservation([], services_requires_scan=False)
assert obs_not_requiring_scan.observe(state)["health_status"] == 3 # should be actual value
class TestApplicationsRequiresScan:
@pytest.mark.parametrize(
("yaml_option_string", "expected_val"),
(
("applications_requires_scan: true", True),
("applications_requires_scan: false", False),
(" ", True),
),
)
def test_obs_config(self, yaml_option_string, expected_val):
"""Check that the default behaviour is to set applications_requires_scan to True."""
obs_cfg_yaml = f"""
type: custom
options:
components:
- type: nodes
label: NODES
options:
hosts:
- hostname: domain_controller
- hostname: web_server
- hostname: database_server
folders:
- folder_name: database
files:
- file_name: database.db
- hostname: backup_server
- hostname: security_suite
- hostname: client_1
applications:
- application_name: web-browser
- hostname: client_2
applications:
- application_name: web-browser
- application_name: database-client
num_services: 0
num_applications: 3
num_folders: 1
num_files: 1
num_nics: 2
include_num_access: false
{yaml_option_string}
include_nmne: true
monitored_traffic:
icmp:
- NONE
tcp:
- DNS
routers:
- hostname: router_1
num_ports: 0
ip_list:
- 192.168.1.10
- 192.168.1.12
- 192.168.1.14
- 192.168.1.16
- 192.168.1.110
- 192.168.10.21
- 192.168.10.22
- 192.168.10.110
wildcard_list:
- 0.0.0.1
port_list:
- 80
- 5432
protocol_list:
- ICMP
- TCP
- UDP
num_rules: 10
- type: links
label: LINKS
options:
link_references:
- router_1:eth-1<->switch_1:eth-8
- router_1:eth-2<->switch_2:eth-8
- switch_1:eth-1<->domain_controller:eth-1
- switch_1:eth-2<->web_server:eth-1
- switch_1:eth-3<->database_server:eth-1
- switch_1:eth-4<->backup_server:eth-1
- switch_1:eth-7<->security_suite:eth-1
- switch_2:eth-1<->client_1:eth-1
- switch_2:eth-2<->client_2:eth-1
- switch_2:eth-7<->security_suite:eth-2
- type: none
label: ICS
options: {{}}
"""
cfg = yaml.safe_load(obs_cfg_yaml)
manager = ObservationManager.from_config(cfg)
hosts: List[HostObservation] = manager.obs.components["NODES"].hosts
for i, host in enumerate(hosts):
services: List[ServiceObservation] = host.services
for j, service in enumerate(services):
val = service.services_requires_scan
print(f"host {i} service {j} {val}")
assert val == expected_val # Make sure applications require scan by default
def test_applications_requires_scan(self):
state = {"health_state_actual": 3, "health_state_visible": 1, "operating_state": 1, "num_executions": 1}
obs_requiring_scan = ApplicationObservation([], applications_requires_scan=True)
assert obs_requiring_scan.observe(state)["health_status"] == 1 # should be visible value
obs_not_requiring_scan = ApplicationObservation([], applications_requires_scan=False)
assert obs_not_requiring_scan.observe(state)["health_status"] == 3 # should be actual value

View File

@@ -73,7 +73,7 @@ def test_ftp_should_not_process_commands_if_service_not_running(ftp_client):
assert ftp_client_service._process_ftp_command(payload=payload).status_code is FTPStatusCode.ERROR
def test_ftp_tries_to_senf_file__that_does_not_exist(ftp_client):
def test_ftp_tries_to_send_file__that_does_not_exist(ftp_client):
"""Method send_file should return false if no file to send."""
assert ftp_client.file_system.get_file(folder_name="root", file_name="test.txt") is None

View File

@@ -6,6 +6,7 @@ import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.interface.request import RequestResponse
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
@@ -442,3 +443,59 @@ def test_terminal_connection_timeout(basic_network):
assert len(computer_b.user_session_manager.remote_sessions) == 0
assert not remote_connection.is_active
def test_terminal_last_response_updates(basic_network):
"""Test that the _last_response within Terminal correctly updates."""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
terminal_a: Terminal = computer_a.software_manager.software.get("terminal")
computer_b: Computer = network.get_node_by_hostname("node_b")
assert terminal_a.last_response is None
remote_connection = terminal_a.login(username="admin", password="admin", ip_address="192.168.0.11")
# Last response should be a successful logon
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "Login Successful"})
remote_connection.execute(command=["software_manager", "application", "install", "ransomware-script"])
# Last response should now update following successful install
assert terminal_a.last_response == RequestResponse(status="success", data={})
remote_connection.execute(command=["software_manager", "application", "install", "ransomware-script"])
# Last response should now update to success, but with supplied reason.
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "already installed"})
remote_connection.execute(command=["file_system", "create", "file", "folder123", "doggo.pdf", False])
# Check file was created.
assert computer_b.file_system.access_file(folder_name="folder123", file_name="doggo.pdf")
# Last response should be confirmation of file creation.
assert terminal_a.last_response == RequestResponse(
status="success",
data={"file_name": "doggo.pdf", "folder_name": "folder123", "file_type": "PDF", "file_size": 102400},
)
remote_connection.execute(
command=[
"service",
"ftp-client",
"send",
{
"dest_ip_address": "192.168.0.2",
"src_folder": "folder123",
"src_file_name": "cat.pdf",
"dest_folder": "root",
"dest_file_name": "cat.pdf",
},
]
)
assert terminal_a.last_response == RequestResponse(
status="failure",
data={"reason": "Unable to locate given file on local file system. Perhaps given options are invalid?"},
)