Merge in updates from dev
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
|
||||
import json
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from primaite.game.agent.observations import ObservationManager
|
||||
from primaite.game.agent.observations import ApplicationObservation, ObservationManager, ServiceObservation
|
||||
from primaite.game.agent.observations.file_system_observations import FileObservation, FolderObservation
|
||||
from primaite.game.agent.observations.host_observations import HostObservation
|
||||
|
||||
@@ -136,3 +137,227 @@ class TestFileSystemRequiresScan:
|
||||
[], files=[], num_files=0, include_num_access=False, file_system_requires_scan=True
|
||||
)
|
||||
assert obs_requiring_scan.observe(folder_state)["health_status"] == 1
|
||||
|
||||
|
||||
class TestServicesRequiresScan:
|
||||
@pytest.mark.parametrize(
|
||||
("yaml_option_string", "expected_val"),
|
||||
(
|
||||
("services_requires_scan: true", True),
|
||||
("services_requires_scan: false", False),
|
||||
(" ", True),
|
||||
),
|
||||
)
|
||||
def test_obs_config(self, yaml_option_string, expected_val):
|
||||
"""Check that the default behaviour is to set service_requires_scan to True."""
|
||||
obs_cfg_yaml = f"""
|
||||
type: custom
|
||||
options:
|
||||
components:
|
||||
- type: nodes
|
||||
label: NODES
|
||||
options:
|
||||
hosts:
|
||||
- hostname: domain_controller
|
||||
- hostname: web_server
|
||||
services:
|
||||
- service_name: web-server
|
||||
- service_name: dns-client
|
||||
- hostname: database_server
|
||||
folders:
|
||||
- folder_name: database
|
||||
files:
|
||||
- file_name: database.db
|
||||
- hostname: backup_server
|
||||
services:
|
||||
- service_name: ftp-server
|
||||
- hostname: security_suite
|
||||
- hostname: client_1
|
||||
- hostname: client_2
|
||||
num_services: 3
|
||||
num_applications: 0
|
||||
num_folders: 1
|
||||
num_files: 1
|
||||
num_nics: 2
|
||||
include_num_access: false
|
||||
{yaml_option_string}
|
||||
include_nmne: true
|
||||
monitored_traffic:
|
||||
icmp:
|
||||
- NONE
|
||||
tcp:
|
||||
- DNS
|
||||
routers:
|
||||
- hostname: router_1
|
||||
num_ports: 0
|
||||
ip_list:
|
||||
- 192.168.1.10
|
||||
- 192.168.1.12
|
||||
- 192.168.1.14
|
||||
- 192.168.1.16
|
||||
- 192.168.1.110
|
||||
- 192.168.10.21
|
||||
- 192.168.10.22
|
||||
- 192.168.10.110
|
||||
wildcard_list:
|
||||
- 0.0.0.1
|
||||
port_list:
|
||||
- 80
|
||||
- 5432
|
||||
protocol_list:
|
||||
- ICMP
|
||||
- TCP
|
||||
- UDP
|
||||
num_rules: 10
|
||||
|
||||
- type: links
|
||||
label: LINKS
|
||||
options:
|
||||
link_references:
|
||||
- router_1:eth-1<->switch_1:eth-8
|
||||
- router_1:eth-2<->switch_2:eth-8
|
||||
- switch_1:eth-1<->domain_controller:eth-1
|
||||
- switch_1:eth-2<->web_server:eth-1
|
||||
- switch_1:eth-3<->database_server:eth-1
|
||||
- switch_1:eth-4<->backup_server:eth-1
|
||||
- switch_1:eth-7<->security_suite:eth-1
|
||||
- switch_2:eth-1<->client_1:eth-1
|
||||
- switch_2:eth-2<->client_2:eth-1
|
||||
- switch_2:eth-7<->security_suite:eth-2
|
||||
- type: none
|
||||
label: ICS
|
||||
options: {{}}
|
||||
|
||||
"""
|
||||
|
||||
cfg = yaml.safe_load(obs_cfg_yaml)
|
||||
manager = ObservationManager.from_config(cfg)
|
||||
|
||||
hosts: List[HostObservation] = manager.obs.components["NODES"].hosts
|
||||
for i, host in enumerate(hosts):
|
||||
services: List[ServiceObservation] = host.services
|
||||
for j, service in enumerate(services):
|
||||
val = service.services_requires_scan
|
||||
print(f"host {i} service {j} {val}")
|
||||
assert val == expected_val # Make sure services require scan by default
|
||||
|
||||
def test_services_requires_scan(self):
|
||||
state = {"health_state_actual": 3, "health_state_visible": 1, "operating_state": 1}
|
||||
|
||||
obs_requiring_scan = ServiceObservation([], services_requires_scan=True)
|
||||
assert obs_requiring_scan.observe(state)["health_status"] == 1 # should be visible value
|
||||
|
||||
obs_not_requiring_scan = ServiceObservation([], services_requires_scan=False)
|
||||
assert obs_not_requiring_scan.observe(state)["health_status"] == 3 # should be actual value
|
||||
|
||||
|
||||
class TestApplicationsRequiresScan:
|
||||
@pytest.mark.parametrize(
|
||||
("yaml_option_string", "expected_val"),
|
||||
(
|
||||
("applications_requires_scan: true", True),
|
||||
("applications_requires_scan: false", False),
|
||||
(" ", True),
|
||||
),
|
||||
)
|
||||
def test_obs_config(self, yaml_option_string, expected_val):
|
||||
"""Check that the default behaviour is to set applications_requires_scan to True."""
|
||||
obs_cfg_yaml = f"""
|
||||
type: custom
|
||||
options:
|
||||
components:
|
||||
- type: nodes
|
||||
label: NODES
|
||||
options:
|
||||
hosts:
|
||||
- hostname: domain_controller
|
||||
- hostname: web_server
|
||||
- hostname: database_server
|
||||
folders:
|
||||
- folder_name: database
|
||||
files:
|
||||
- file_name: database.db
|
||||
- hostname: backup_server
|
||||
- hostname: security_suite
|
||||
- hostname: client_1
|
||||
applications:
|
||||
- application_name: web-browser
|
||||
- hostname: client_2
|
||||
applications:
|
||||
- application_name: web-browser
|
||||
- application_name: database-client
|
||||
num_services: 0
|
||||
num_applications: 3
|
||||
num_folders: 1
|
||||
num_files: 1
|
||||
num_nics: 2
|
||||
include_num_access: false
|
||||
{yaml_option_string}
|
||||
include_nmne: true
|
||||
monitored_traffic:
|
||||
icmp:
|
||||
- NONE
|
||||
tcp:
|
||||
- DNS
|
||||
routers:
|
||||
- hostname: router_1
|
||||
num_ports: 0
|
||||
ip_list:
|
||||
- 192.168.1.10
|
||||
- 192.168.1.12
|
||||
- 192.168.1.14
|
||||
- 192.168.1.16
|
||||
- 192.168.1.110
|
||||
- 192.168.10.21
|
||||
- 192.168.10.22
|
||||
- 192.168.10.110
|
||||
wildcard_list:
|
||||
- 0.0.0.1
|
||||
port_list:
|
||||
- 80
|
||||
- 5432
|
||||
protocol_list:
|
||||
- ICMP
|
||||
- TCP
|
||||
- UDP
|
||||
num_rules: 10
|
||||
|
||||
- type: links
|
||||
label: LINKS
|
||||
options:
|
||||
link_references:
|
||||
- router_1:eth-1<->switch_1:eth-8
|
||||
- router_1:eth-2<->switch_2:eth-8
|
||||
- switch_1:eth-1<->domain_controller:eth-1
|
||||
- switch_1:eth-2<->web_server:eth-1
|
||||
- switch_1:eth-3<->database_server:eth-1
|
||||
- switch_1:eth-4<->backup_server:eth-1
|
||||
- switch_1:eth-7<->security_suite:eth-1
|
||||
- switch_2:eth-1<->client_1:eth-1
|
||||
- switch_2:eth-2<->client_2:eth-1
|
||||
- switch_2:eth-7<->security_suite:eth-2
|
||||
- type: none
|
||||
label: ICS
|
||||
options: {{}}
|
||||
|
||||
"""
|
||||
|
||||
cfg = yaml.safe_load(obs_cfg_yaml)
|
||||
manager = ObservationManager.from_config(cfg)
|
||||
|
||||
hosts: List[HostObservation] = manager.obs.components["NODES"].hosts
|
||||
for i, host in enumerate(hosts):
|
||||
services: List[ServiceObservation] = host.services
|
||||
for j, service in enumerate(services):
|
||||
val = service.services_requires_scan
|
||||
print(f"host {i} service {j} {val}")
|
||||
assert val == expected_val # Make sure applications require scan by default
|
||||
|
||||
def test_applications_requires_scan(self):
|
||||
state = {"health_state_actual": 3, "health_state_visible": 1, "operating_state": 1, "num_executions": 1}
|
||||
|
||||
obs_requiring_scan = ApplicationObservation([], applications_requires_scan=True)
|
||||
assert obs_requiring_scan.observe(state)["health_status"] == 1 # should be visible value
|
||||
|
||||
obs_not_requiring_scan = ApplicationObservation([], applications_requires_scan=False)
|
||||
assert obs_not_requiring_scan.observe(state)["health_status"] == 3 # should be actual value
|
||||
|
||||
@@ -73,7 +73,7 @@ def test_ftp_should_not_process_commands_if_service_not_running(ftp_client):
|
||||
assert ftp_client_service._process_ftp_command(payload=payload).status_code is FTPStatusCode.ERROR
|
||||
|
||||
|
||||
def test_ftp_tries_to_senf_file__that_does_not_exist(ftp_client):
|
||||
def test_ftp_tries_to_send_file__that_does_not_exist(ftp_client):
|
||||
"""Method send_file should return false if no file to send."""
|
||||
assert ftp_client.file_system.get_file(folder_name="root", file_name="test.txt") is None
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
|
||||
from primaite.game.agent.interface import ProxyAgent
|
||||
from primaite.game.game import PrimaiteGame
|
||||
from primaite.interface.request import RequestResponse
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
@@ -442,3 +443,59 @@ def test_terminal_connection_timeout(basic_network):
|
||||
assert len(computer_b.user_session_manager.remote_sessions) == 0
|
||||
|
||||
assert not remote_connection.is_active
|
||||
|
||||
|
||||
def test_terminal_last_response_updates(basic_network):
|
||||
"""Test that the _last_response within Terminal correctly updates."""
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
terminal_a: Terminal = computer_a.software_manager.software.get("terminal")
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
|
||||
assert terminal_a.last_response is None
|
||||
|
||||
remote_connection = terminal_a.login(username="admin", password="admin", ip_address="192.168.0.11")
|
||||
|
||||
# Last response should be a successful logon
|
||||
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "Login Successful"})
|
||||
|
||||
remote_connection.execute(command=["software_manager", "application", "install", "ransomware-script"])
|
||||
|
||||
# Last response should now update following successful install
|
||||
assert terminal_a.last_response == RequestResponse(status="success", data={})
|
||||
|
||||
remote_connection.execute(command=["software_manager", "application", "install", "ransomware-script"])
|
||||
|
||||
# Last response should now update to success, but with supplied reason.
|
||||
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "already installed"})
|
||||
|
||||
remote_connection.execute(command=["file_system", "create", "file", "folder123", "doggo.pdf", False])
|
||||
|
||||
# Check file was created.
|
||||
assert computer_b.file_system.access_file(folder_name="folder123", file_name="doggo.pdf")
|
||||
|
||||
# Last response should be confirmation of file creation.
|
||||
assert terminal_a.last_response == RequestResponse(
|
||||
status="success",
|
||||
data={"file_name": "doggo.pdf", "folder_name": "folder123", "file_type": "PDF", "file_size": 102400},
|
||||
)
|
||||
|
||||
remote_connection.execute(
|
||||
command=[
|
||||
"service",
|
||||
"ftp-client",
|
||||
"send",
|
||||
{
|
||||
"dest_ip_address": "192.168.0.2",
|
||||
"src_folder": "folder123",
|
||||
"src_file_name": "cat.pdf",
|
||||
"dest_folder": "root",
|
||||
"dest_file_name": "cat.pdf",
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
assert terminal_a.last_response == RequestResponse(
|
||||
status="failure",
|
||||
data={"reason": "Unable to locate given file on local file system. Perhaps given options are invalid?"},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user