3086 UC7 Migration - All YAMLS, tests and notebooks. A few lingering issues such as the OS-SCAN not working and agent logs not appearing.

This commit is contained in:
Marek Wolan
2025-02-10 14:39:28 +00:00
parent 0d1edf0362
commit d8c8aa40a4
118 changed files with 21789 additions and 368 deletions

View File

@@ -25,7 +25,19 @@ game:
- ICMP
- TCP
- UDP
thresholds:
nmne:
high: 100
medium: 25
low: 5
file_access:
high: 10
medium: 5
low: 2
app_executions:
high: 5
medium: 3
low: 2
agents:
- ref: client_2_green_user
team: GREEN
@@ -64,10 +76,16 @@ agents:
options:
hosts:
- hostname: client_1
applications:
- application_name: WebBrowser
folders:
- folder_name: root
files:
- file_name: "test.txt"
- hostname: client_2
- hostname: client_3
num_services: 1
num_applications: 0
num_applications: 1
num_folders: 1
num_files: 1
num_nics: 2
@@ -182,6 +200,10 @@ simulation:
options:
ntp_server_ip: 192.168.1.10
- type: ntp-server
folders:
- folder_name: root
files:
- file_name: test.txt
- hostname: client_2
type: computer
ip_address: 192.168.10.22

View File

@@ -0,0 +1,226 @@
# Basic Switched network
#
# -------------- -------------- --------------
# | client_1 |------| switch_1 |------| client_2 |
# -------------- -------------- --------------
#
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
agent_log_level: INFO
save_agent_logs: true
write_agent_log_to_terminal: True
game:
max_episode_length: 256
ports:
- ARP
- DNS
- HTTP
- POSTGRES_SERVER
protocols:
- ICMP
- TCP
- UDP
agents:
- ref: client_2_green_user
team: GREEN
type: periodic-agent
action_space:
action_map:
0:
action: do-nothing
options: {}
1:
action: node-application-execute
options:
node_id: 0
application_id: 0
agent_settings:
possible_start_nodes: [client_2,]
target_application: web-browser
start_step: 5
frequency: 4
variance: 3
- ref: defender
team: BLUE
type: proxy-agent
observation_space:
type: custom
options:
components:
- type: nodes
label: NODES
options:
hosts:
- hostname: client_1
- hostname: client_2
- hostname: client_3
num_services: 1
num_applications: 0
num_folders: 1
num_files: 1
num_nics: 2
include_num_access: false
monitored_traffic:
icmp:
- NONE
tcp:
- DNS
include_nmne: false
routers:
- hostname: router_1
num_ports: 0
ip_list:
- 192.168.10.21
- 192.168.10.22
- 192.168.10.23
wildcard_list:
- 0.0.0.1
port_list:
- 80
- 5432
protocol_list:
- ICMP
- TCP
- UDP
num_rules: 10
- type: links
label: LINKS
options:
link_references:
- switch_1:eth-1<->client_1:eth-1
- switch_1:eth-2<->client_2:eth-1
- type: none
label: ICS
options: {}
action_space:
action_map:
0:
action: do-nothing
options: {}
reward_function:
reward_components:
- type: database-file-integrity
weight: 0.5
options:
node_hostname: database_server
folder_name: database
file_name: database.db
- type: web-server-404-penalty
weight: 0.5
options:
node_hostname: web_server
service_name: web_server_web_service
agent_settings:
flatten_obs: true
simulation:
network:
nodes:
- type: switch
hostname: switch_1
num_ports: 8
- hostname: client_1
type: computer
ip_address: 192.168.10.21
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
dns_server: 192.168.1.10
applications:
- type: ransomware-script
- type: web-browser
options:
target_url: http://arcd.com/users/
- type: database-client
options:
db_server_ip: 192.168.1.10
server_password: arcd
- type: data-manipulation-bot
options:
port_scan_p_of_success: 0.8
data_manipulation_p_of_success: 0.8
payload: "DELETE"
server_ip: 192.168.1.21
server_password: arcd
- type: dos-bot
options:
target_ip_address: 192.168.10.21
payload: SPOOF DATA
port_scan_p_of_success: 0.8
services:
- type: dns-client
options:
dns_server: 192.168.1.10
- type: dns-server
options:
domain_mapping:
arcd.com: 192.168.1.10
- type: database-service
options:
backup_server_ip: 192.168.1.10
- type: web-server
- type: ftp-server
options:
server_password: arcd
- type: ntp-client
options:
ntp_server_ip: 192.168.1.10
- type: ntp-server
- hostname: client_2
type: computer
ip_address: 192.168.10.22
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
dns_server: 192.168.1.10
folders:
- folder_name: empty_folder
- folder_name: downloads
files:
- file_name: "test.txt"
- file_name: "another_file.pwtwoti"
- folder_name: root
files:
- file_name: passwords
size: 663
type: TXT
# pre installed services and applications
- hostname: client_3
type: computer
ip_address: 192.168.10.23
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
dns_server: 192.168.1.10
start_up_duration: 0
shut_down_duration: 0
operating_state: "OFF"
# pre installed services and applications
links:
- endpoint_a_hostname: switch_1
endpoint_a_port: 1
endpoint_b_hostname: client_1
endpoint_b_port: 1
bandwidth: 200
- endpoint_a_hostname: switch_1
endpoint_a_port: 2
endpoint_b_hostname: client_2
endpoint_b_port: 1
bandwidth: 200

View File

@@ -0,0 +1,173 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG, load
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
CONFIG_FILE = _EXAMPLE_CFG / "uc7_config.yaml"
@pytest.fixture(scope="function")
def uc7_environment() -> PrimaiteGymEnv:
with open(_EXAMPLE_CFG / "uc7_config.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
env = PrimaiteGymEnv(env_config=cfg)
return env
def assert_agent_reward(env: PrimaiteGymEnv, agent_name: str, positive: bool):
"""Asserts that a given agent has a reward that is below/above or equal to 0 dependant on arguments."""
agent_reward = env.game.agents[agent_name].reward_function.total_reward
if agent_name == "defender":
return # ignore blue agent
if positive is True:
assert agent_reward >= 0 # Asserts that no agents are below a total reward of 0
elif positive is False:
assert agent_reward <= 0 # Asserts that no agents are above a total reward of 0
else:
print("Invalid 'positive' argument.")
def test_green_agent_positive_reward(uc7_environment):
"""Confirms that the UC7 Green Agents receive a positive reward (Default Behaviour)."""
env: PrimaiteGymEnv = uc7_environment
# Performing no changes to the environment. Default Behaviour
# Stepping 60 times in the environment
for _ in range(60):
env.step(0)
for agent in env.game.agents:
assert_agent_reward(env=env, agent_name=env.game.agents[agent].config.ref, positive=True)
def test_green_agent_negative_reward(uc7_environment):
"""Confirms that the UC7 Green Agents receive a negative reward. (Disabled web-server and database-service)"""
env: PrimaiteGymEnv = uc7_environment
# Purposefully disabling the following services:
# 1. Disabling the web-server
st_dmz_pub_srv_web: Server = env.game.simulation.network.get_node_by_hostname("ST-DMZ-PUB-SRV-WEB")
st_web_server = st_dmz_pub_srv_web.software_manager.software["web-server"]
st_web_server.operating_state = ServiceOperatingState.DISABLED
assert st_web_server.operating_state == ServiceOperatingState.DISABLED
# 2. Disabling the DatabaseServer
st_data_database_server: Server = env.game.simulation.network.get_node_by_hostname("ST-DATA-PRV-SRV-DB")
database_service: DatabaseService = st_data_database_server.software_manager.software["database-service"]
database_service.operating_state = ServiceOperatingState.DISABLED
assert database_service.operating_state == ServiceOperatingState.DISABLED
# Stepping 100 times in the environment
for _ in range(100):
env.step(0)
for agent in env.game.agents:
assert_agent_reward(env=env, agent_name=env.game.agents[agent].config.ref, positive=False)
def test_tap001_default_behaviour(uc7_environment):
"""Confirms that the TAP001 expected simulation impacts works as expected in the UC7 environment."""
env: PrimaiteGymEnv = uc7_environment
env.reset()
network = env.game.simulation.network
# Running for 128 episodes
for _ in range(128):
env.step(0)
some_tech_proj_a_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-1")
# Asserting that the `malware_dropper.ps1` was created.
malware_dropper_file: File = some_tech_proj_a_pc_1.file_system.get_file("downloads", "malware_dropper.ps1")
assert malware_dropper_file.health_status == FileSystemItemHealthStatus.GOOD
# Asserting that the `RansomwareScript` launched successfully.
ransomware_script: RansomwareScript = some_tech_proj_a_pc_1.software_manager.software["ransomware-script"]
assert ransomware_script.health_state_actual == SoftwareHealthState.GOOD
assert ransomware_script.operating_state == ApplicationOperatingState.RUNNING
# Asserting that the `C2Beacon` connected to the `C2Server`.
c2_beacon: C2Beacon = some_tech_proj_a_pc_1.software_manager.software["c2-beacon"]
assert c2_beacon.health_state_actual == SoftwareHealthState.GOOD
assert c2_beacon.operating_state == ApplicationOperatingState.RUNNING
assert c2_beacon.c2_connection_active == True
# Asserting that the target database was successfully corrupted.
some_tech_data_server_database: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-DB")
database_file: File = some_tech_data_server_database.file_system.get_file(
folder_name="database", file_name="database.db"
)
assert database_file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_tap003_default_behaviour(uc7_environment):
"""Confirms that the TAP003 expected simulation impacts works as expected in the UC7 environment."""
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
from primaite.utils.validation.port import PORT_LOOKUP
def uc7_environment_tap003() -> PrimaiteGymEnv:
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["agents"][32]["agent_settings"]["starting_nodes"] = ["ST-PROJ-A-PRV-PC-1"]
cfg["agents"][32]["agent_settings"]["default_starting_node"] = "ST-PROJ-A-PRV-PC-1"
env = PrimaiteGymEnv(env_config=cfg)
return env
env: PrimaiteGymEnv = uc7_environment_tap003()
env.reset()
# Running for 128 episodes
for _ in range(128):
env.step(0)
network = env.game.simulation.network
# Asserting that a malicious ACL has been added to ST-INTRA-PRV-RT-DR-1
st_intra_prv_rt_dr_1: Router = network.get_node_by_hostname(hostname="ST-INTRA-PRV-RT-DR-1")
assert st_intra_prv_rt_dr_1.acl.acl[1].action == ACLAction.DENY
assert st_intra_prv_rt_dr_1.acl.acl[1].protocol == "tcp"
assert st_intra_prv_rt_dr_1.acl.acl[1].src_port == PORT_LOOKUP.get("POSTGRES_SERVER")
assert st_intra_prv_rt_dr_1.acl.acl[1].dst_port == PORT_LOOKUP.get("POSTGRES_SERVER")
# Asserting that a malicious ACL has been added to ST-INTRA-PRV-RT-CR
st_intra_prv_rt_cr: Router = network.get_node_by_hostname(hostname="ST-INTRA-PRV-RT-CR")
assert st_intra_prv_rt_cr.acl.acl[1].action == ACLAction.DENY
assert st_intra_prv_rt_cr.acl.acl[1].protocol == "tcp"
assert st_intra_prv_rt_cr.acl.acl[1].src_port == PORT_LOOKUP.get("HTTP")
assert st_intra_prv_rt_cr.acl.acl[1].dst_port == PORT_LOOKUP.get("HTTP")
# Asserting that a malicious ACL has been added to REM-PUB-RT-DR
rem_pub_rt_dr: Router = network.get_node_by_hostname(hostname="REM-PUB-RT-DR")
assert rem_pub_rt_dr.acl.acl[1].action == ACLAction.DENY
assert rem_pub_rt_dr.acl.acl[1].protocol == "tcp"
assert rem_pub_rt_dr.acl.acl[1].src_port == PORT_LOOKUP.get("DNS")
assert rem_pub_rt_dr.acl.acl[1].dst_port == PORT_LOOKUP.get("DNS")

View File

@@ -0,0 +1,237 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
CONFIG_FILE = _EXAMPLE_CFG / "uc7_config.yaml"
@pytest.fixture(scope="function")
def uc7_network() -> Network:
with open(file=CONFIG_FILE, mode="r") as f:
cfg = yaml.safe_load(stream=f)
game = PrimaiteGame.from_config(cfg=cfg)
return game.simulation.network
def test_ping_home_office(uc7_network):
"""Asserts that all home_pub_* can ping each-other and the public dns (isp_pub_srv_dns)"""
network = uc7_network
home_pub_pc_1: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
home_pub_pc_2: Computer = network.get_node_by_hostname("HOME-PUB-PC-2")
home_pub_pc_srv: Server = network.get_node_by_hostname("HOME-PUB-SRV")
home_pub_rt_dr: Router = network.get_node_by_hostname("HOME-PUB-RT-DR")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
assert home_pub_pc_1.ping(isp_pub_srv_dns.network_interface[1].ip_address)
def ping_all_home_office(host):
assert host.ping(home_pub_pc_1.network_interface[1].ip_address)
assert host.ping(home_pub_pc_2.network_interface[1].ip_address)
assert host.ping(home_pub_pc_srv.network_interface[1].ip_address)
assert host.ping(home_pub_rt_dr.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_home_office(home_pub_pc_1)
ping_all_home_office(home_pub_pc_2)
ping_all_home_office(home_pub_pc_srv)
ping_all_home_office(isp_pub_srv_dns)
def test_ping_remote_site(uc7_network):
"""Asserts that all remote_pub_* hosts can ping each-other and the public dns server (isp_pub_srv_dns)"""
network = uc7_network
rem_pub_fw: Firewall = network.get_node_by_hostname(hostname="REM-PUB-FW")
rem_pub_rt_dr: Router = network.get_node_by_hostname(hostname="REM-PUB-RT-DR")
rem_pub_pc_1: Computer = network.get_node_by_hostname(hostname="REM-PUB-PC-1")
rem_pub_pc_2: Computer = network.get_node_by_hostname(hostname="REM-PUB-PC-2")
rem_pub_srv: Computer = network.get_node_by_hostname(hostname="REM-PUB-SRV")
def ping_all_remote_site(host):
assert host.ping(rem_pub_fw.network_interface[1].ip_address)
assert host.ping(rem_pub_rt_dr.network_interface[1].ip_address)
assert host.ping(rem_pub_pc_1.network_interface[1].ip_address)
assert host.ping(rem_pub_pc_2.network_interface[1].ip_address)
assert host.ping(rem_pub_srv.network_interface[1].ip_address)
ping_all_remote_site(host=rem_pub_fw)
ping_all_remote_site(host=rem_pub_rt_dr)
ping_all_remote_site(host=rem_pub_pc_1)
ping_all_remote_site(host=rem_pub_pc_2)
ping_all_remote_site(host=rem_pub_srv)
def test_ping_some_tech_dmz(uc7_network):
"""Asserts that the st_dmz_pub_srv_web and the st_public_firewall can ping each other and remote site and home office."""
network = uc7_network
st_pub_fw: Firewall = network.get_node_by_hostname(hostname="ST-PUB-FW")
st_dmz_pub_srv_web: Server = network.get_node_by_hostname(hostname="ST-DMZ-PUB-SRV-WEB")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
home_pub_pc_1: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
def ping_all_some_tech_dmz(host):
assert host.ping(st_dmz_pub_srv_web.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_dmz(host=st_pub_fw)
ping_all_some_tech_dmz(host=isp_pub_srv_dns)
ping_all_some_tech_dmz(host=home_pub_pc_1)
def test_ping_some_tech_head_office(uc7_network):
"""Asserts that all the some_tech_* PCs can ping each other and the public dns"""
network = uc7_network
st_home_office_private_pc_1: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-1")
st_home_office_private_pc_2: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-2")
st_home_office_private_pc_3: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-3")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
def ping_all_some_tech_head_office(host):
assert host.ping(st_home_office_private_pc_1.network_interface[1].ip_address)
assert host.ping(st_home_office_private_pc_2.network_interface[1].ip_address)
assert host.ping(st_home_office_private_pc_3.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_head_office(host=st_home_office_private_pc_1)
ping_all_some_tech_head_office(host=st_home_office_private_pc_2)
ping_all_some_tech_head_office(host=st_home_office_private_pc_3)
def test_ping_some_tech_hr(uc7_network):
"""Assert that all some_tech_hr_* PCs can ping each other and the public dns"""
network = uc7_network
some_tech_hr_pc_1: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-1")
some_tech_hr_pc_2: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-2")
some_tech_hr_pc_3: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-3")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
def ping_all_some_tech_hr(host):
assert host.ping(some_tech_hr_pc_1.network_interface[1].ip_address)
assert host.ping(some_tech_hr_pc_2.network_interface[1].ip_address)
assert host.ping(some_tech_hr_pc_3.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_hr(some_tech_hr_pc_1)
ping_all_some_tech_hr(some_tech_hr_pc_2)
ping_all_some_tech_hr(some_tech_hr_pc_3)
def test_some_tech_data_hr(uc7_network):
"""Assert that all some_tech_data_* servers can ping each other and the public dns."""
network = uc7_network
some_tech_data_server_storage: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-STORAGE")
some_tech_data_server_database: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-DB")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
def ping_all_some_tech_hr(host):
assert host.ping(some_tech_data_server_storage.network_interface[1].ip_address)
assert host.ping(some_tech_data_server_database.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_hr(some_tech_data_server_storage)
ping_all_some_tech_hr(some_tech_data_server_database)
def test_some_tech_project_a(uc7_network):
"""Asserts that all some_tech project A's PCs can ping each other and the public dns."""
network = uc7_network
some_tech_proj_a_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-1")
some_tech_proj_a_pc_2: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-2")
some_tech_proj_a_pc_3: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-3")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
def ping_all_some_tech_proj_a(host):
assert host.ping(some_tech_proj_a_pc_1.network_interface[1].ip_address)
assert host.ping(some_tech_proj_a_pc_2.network_interface[1].ip_address)
assert host.ping(some_tech_proj_a_pc_3.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_proj_a(some_tech_proj_a_pc_1)
ping_all_some_tech_proj_a(some_tech_proj_a_pc_2)
ping_all_some_tech_proj_a(some_tech_proj_a_pc_3)
def test_some_tech_project_b(uc7_network):
"""Asserts that all some_tech_project_b PC's can ping each other and the public dps."""
network = uc7_network
some_tech_proj_b_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-1")
some_tech_proj_b_pc_2: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-2")
some_tech_proj_b_pc_3: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-3")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
def ping_all_some_tech_proj_b(host):
assert host.ping(some_tech_proj_b_pc_1.network_interface[1].ip_address)
assert host.ping(some_tech_proj_b_pc_2.network_interface[1].ip_address)
assert host.ping(some_tech_proj_b_pc_3.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_proj_b(some_tech_proj_b_pc_1)
ping_all_some_tech_proj_b(some_tech_proj_b_pc_2)
ping_all_some_tech_proj_b(some_tech_proj_b_pc_3)
def test_some_tech_project_a(uc7_network):
"""Asserts that all some_tech_project_c PC's can ping each other and the public dps."""
network = uc7_network
some_tech_proj_c_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-1")
some_tech_proj_c_pc_2: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-2")
some_tech_proj_c_pc_3: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-3")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
def ping_all_some_tech_proj_c(host):
assert host.ping(some_tech_proj_c_pc_1.network_interface[1].ip_address)
assert host.ping(some_tech_proj_c_pc_2.network_interface[1].ip_address)
assert host.ping(some_tech_proj_c_pc_3.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
ping_all_some_tech_proj_c(some_tech_proj_c_pc_1)
ping_all_some_tech_proj_c(some_tech_proj_c_pc_2)
ping_all_some_tech_proj_c(some_tech_proj_c_pc_3)
def test_ping_all_networks(uc7_network):
"""Asserts that one machine from each network is able to ping all others."""
network = uc7_network
home_office_pc_1: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
remote_office_pc_1: Computer = network.get_node_by_hostname("REM-PUB-PC-1")
st_head_office_pc_1: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-1")
st_human_resources_pc_1: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-1")
st_data_storage_server: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-STORAGE")
st_data_database_server: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-DB")
st_proj_a_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-1")
st_proj_b_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-1")
st_proj_c_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-1")
def ping_network_wide(host):
assert host.ping(home_office_pc_1.network_interface[1].ip_address)
assert host.ping(isp_pub_srv_dns.network_interface[1].ip_address)
assert host.ping(remote_office_pc_1.network_interface[1].ip_address)
assert host.ping(st_head_office_pc_1.network_interface[1].ip_address)
assert host.ping(st_human_resources_pc_1.network_interface[1].ip_address)
assert host.ping(st_data_storage_server.network_interface[1].ip_address)
assert host.ping(st_data_database_server.network_interface[1].ip_address)
assert host.ping(st_proj_a_pc_1.network_interface[1].ip_address)
assert host.ping(st_proj_b_pc_1.network_interface[1].ip_address)
assert host.ping(st_proj_c_pc_1.network_interface[1].ip_address)
ping_network_wide(host=home_office_pc_1)
ping_network_wide(host=isp_pub_srv_dns)
ping_network_wide(host=remote_office_pc_1)
ping_network_wide(host=st_head_office_pc_1)
ping_network_wide(host=st_human_resources_pc_1)
ping_network_wide(host=st_data_storage_server)
ping_network_wide(host=st_data_database_server)
ping_network_wide(host=st_proj_a_pc_1)
ping_network_wide(host=st_proj_b_pc_1)
ping_network_wide(host=st_proj_c_pc_1)

View File

@@ -0,0 +1,338 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
CONFIG_FILE = _EXAMPLE_CFG / "uc7_config.yaml"
@pytest.fixture(scope="function")
def uc7_network() -> Network:
with open(file=CONFIG_FILE, mode="r") as f:
cfg = yaml.safe_load(stream=f)
game = PrimaiteGame.from_config(cfg=cfg)
return game.simulation.network
def assert_ntp_client(host):
"""Confirms that the ntp_client service is present and functioning."""
ntp_client: NTPClient = host.software_manager.software["ntp-client"]
assert ntp_client is not None
assert ntp_client.operating_state == ServiceOperatingState.RUNNING
assert ntp_client.health_state_actual == SoftwareHealthState.GOOD
def assert_dns_client(host):
"""Confirms that the dns_client service is present and functioning."""
dns_client: DNSClient = host.software_manager.software["dns-client"]
assert dns_client is not None
assert dns_client.operating_state == ServiceOperatingState.RUNNING
assert dns_client.health_state_actual == SoftwareHealthState.GOOD
def assert_web_browser(host: Computer):
"""Asserts that the web_browser application is present and functioning."""
web_browser: WebBrowser = host.software_manager.software["web-browser"]
assert web_browser is not None
assert web_browser.operating_state == ApplicationOperatingState.RUNNING
assert web_browser.health_state_actual == SoftwareHealthState.GOOD
def assert_database_client(host: Computer):
"""Asserts that the database_client application is present and functioning."""
database_client = host.software_manager.software["database-client"]
assert database_client is not None
assert database_client.operating_state == ApplicationOperatingState.RUNNING
assert database_client.health_state_actual == SoftwareHealthState.GOOD
def test_home_office_software(uc7_network):
"""Asserts that each host in the home_office network contains the expected software."""
network: Network = uc7_network
home_pub_pc_1: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
home_pub_pc_2: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
home_pub_srv: Server = network.get_node_by_hostname("HOME-PUB-SRV")
# Home Office PC 1
assert_web_browser(home_pub_pc_1)
assert_database_client(home_pub_pc_1)
assert_dns_client(home_pub_pc_1)
assert_ntp_client(home_pub_pc_1)
# Home Office PC 2
assert_web_browser(home_pub_pc_2)
assert_database_client(home_pub_pc_2)
assert_dns_client(home_pub_pc_2)
assert_ntp_client(home_pub_pc_2)
# Home Office Server
assert_dns_client(home_pub_srv)
assert_ntp_client(home_pub_srv)
def test_internet_dns_server(uc7_network):
"""Asserts that `ISP-PUB-SRV-DNS` host's DNSServer application is operating and functioning as expected."""
network: Network = uc7_network
isp_pub_srv_dns: Server = network.get_node_by_hostname("ISP-PUB-SRV-DNS")
# Confirming that the DNSServer is up and running:
dns_server: DNSServer = isp_pub_srv_dns.software_manager.software["dns-server"]
assert dns_server is not None
assert dns_server.operating_state == ServiceOperatingState.RUNNING
assert dns_server.health_state_actual == SoftwareHealthState.GOOD
# Confirming that the DNSServer is performing as expected by performing a request from a client
home_pub_pc_1: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
dns_client: DNSClient = home_pub_pc_1.software_manager.software["dns-client"]
assert dns_client.check_domain_exists(target_domain="some_tech.com")
assert dns_client.dns_cache.get("some_tech.com", None) is not None
assert len(dns_client.dns_cache) == 1
def test_remote_office_software(uc7_network):
"""Asserts that each host on the remote_office network has the expected services & applications which are operating as expected."""
network = uc7_network
rem_pub_pc_1: Computer = network.get_node_by_hostname(hostname="REM-PUB-PC-1")
rem_pub_pc_2: Computer = network.get_node_by_hostname(hostname="REM-PUB-PC-2")
rem_pub_srv: Server = network.get_node_by_hostname(hostname="REM-PUB-SRV")
# Remote Site PC 1
assert_web_browser(rem_pub_pc_1)
assert_database_client(rem_pub_pc_1)
assert_dns_client(rem_pub_pc_1)
assert_ntp_client(rem_pub_pc_1)
# Remote Site PC 2
assert_web_browser(rem_pub_pc_2)
assert_database_client(rem_pub_pc_2)
assert_dns_client(rem_pub_pc_2)
assert_ntp_client(rem_pub_pc_2)
# Remote Site Server
assert_dns_client(rem_pub_srv)
assert_ntp_client(rem_pub_srv)
def test_dmz_web_server(uc7_network):
"""Asserts that the DMZ WebServer functions as expected"""
network: Network = uc7_network
st_dmz_pub_srv_web: Server = network.get_node_by_hostname("ST-DMZ-PUB-SRV-WEB")
# Asserting the ST Web Server is working as expected
st_web_server = st_dmz_pub_srv_web.software_manager.software["web-server"]
assert st_web_server is not None
assert st_web_server.operating_state == ServiceOperatingState.RUNNING
assert st_web_server.health_state_actual == SoftwareHealthState.GOOD
# Asserting that WebBrowser can actually connect to the WebServer
# SOME TECH Human Resources --> DMZ Web Server
st_hr_pc_1: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-1")
st_hr_pc_1_web_browser: WebBrowser = st_hr_pc_1.software_manager.software["web-browser"]
assert st_hr_pc_1_web_browser.get_webpage("http://some_tech.com")
# Remote Site --> DMZ Web Server
rem_pub_pc_1: Computer = network.get_node_by_hostname("REM-PUB-PC-1")
rem_pub_pc_1_web_browser: WebBrowser = rem_pub_pc_1.software_manager.software["web-browser"]
assert rem_pub_pc_1_web_browser.get_webpage("http://some_tech.com")
# Home Office --> DMZ Web Server
home_pub_pc_1: Computer = network.get_node_by_hostname("HOME-PUB-PC-1")
home_pub_pc_1_web_browser: WebBrowser = home_pub_pc_1.software_manager.software["web-browser"]
assert home_pub_pc_1_web_browser.get_webpage("http://some_tech.com")
def test_tech_head_office_software(uc7_network):
"""Asserts that each host on the some_tech_head_office network has the expected services & applications which are operating as expected."""
network: Network = uc7_network
st_head_office_private_pc_1: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-1")
st_head_office_private_pc_2: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-2")
st_head_office_private_pc_3: Computer = network.get_node_by_hostname("ST-HO-PRV-PC-3")
# ST Head Office One
assert_web_browser(st_head_office_private_pc_1)
assert_database_client(st_head_office_private_pc_1)
assert_dns_client(st_head_office_private_pc_1)
assert_ntp_client(st_head_office_private_pc_1)
# ST Head Office Two
assert_web_browser(st_head_office_private_pc_2)
assert_database_client(st_head_office_private_pc_2)
assert_dns_client(st_head_office_private_pc_2)
assert_ntp_client(st_head_office_private_pc_2)
# ST Head Office Three
assert_web_browser(st_head_office_private_pc_3)
assert_database_client(st_head_office_private_pc_3)
assert_dns_client(st_head_office_private_pc_3)
assert_ntp_client(st_head_office_private_pc_3)
def test_tech_human_resources_office_software(uc7_network):
"""Asserts that each host on the some_tech human_resources network has the expected services & applications which are operating as expected."""
network: Network = uc7_network
st_hr_pc_1: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-1")
st_hr_pc_2: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-2")
st_hr_pc_3: Computer = network.get_node_by_hostname("ST-HR-PRV-PC-3")
# ST Human Resource PC 1
assert_web_browser(st_hr_pc_1)
assert_database_client(st_hr_pc_1)
assert_dns_client(st_hr_pc_1)
assert_ntp_client(st_hr_pc_1)
# ST Human Resource PC 2
assert_web_browser(st_hr_pc_2)
assert_database_client(st_hr_pc_2)
assert_dns_client(st_hr_pc_2)
assert_ntp_client(st_hr_pc_2)
# ST Human Resource PC 3
assert_web_browser(st_hr_pc_3)
assert_database_client(st_hr_pc_3)
assert_dns_client(st_hr_pc_3)
assert_ntp_client(st_hr_pc_3)
def test_tech_data_software(uc7_network):
"""Asserts the database and database storage servers on the some_tech data network are operating as expected."""
network: Network = uc7_network
st_data_database_server: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-DB")
st_data_database_storage: Server = network.get_node_by_hostname("ST-DATA-PRV-SRV-STORAGE")
st_proj_a_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-1")
# Asserting that the database_service is working as expected
database_service: DatabaseService = st_data_database_server.software_manager.software["database-service"]
assert database_service is not None
assert database_service.operating_state == ServiceOperatingState.RUNNING
assert database_service.health_state_actual == SoftwareHealthState.GOOD
# Asserting that the database_client can connect to the database
database_client: DatabaseClient = st_proj_a_pc_1.software_manager.software["database-client"]
assert database_client.server_ip_address is not None
assert database_client.server_ip_address == st_data_database_server.network_interface[1].ip_address
assert database_client.connect()
# Asserting that the database storage works as expected.
assert database_service.backup_server_ip == st_data_database_storage.network_interface[1].ip_address
assert database_service.backup_database()
def test_tech_proj_a_software(uc7_network):
"""Asserts that each host on the some_tech project A network has the expected services & applications which are operating as expected."""
network: Network = uc7_network
st_proj_a_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-1")
st_proj_a_pc_2: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-2")
st_proj_a_pc_3: Computer = network.get_node_by_hostname("ST-PROJ-A-PRV-PC-3")
# ST Project A - PC 1
assert_web_browser(st_proj_a_pc_1)
assert_database_client(st_proj_a_pc_1)
assert_dns_client(st_proj_a_pc_1)
assert_ntp_client(st_proj_a_pc_1)
# ST Project A - PC 2
assert_web_browser(st_proj_a_pc_2)
assert_database_client(st_proj_a_pc_2)
assert_dns_client(st_proj_a_pc_2)
assert_ntp_client(st_proj_a_pc_2)
# ST Project A - PC 3
assert_web_browser(st_proj_a_pc_3)
assert_database_client(st_proj_a_pc_3)
assert_dns_client(st_proj_a_pc_3)
assert_ntp_client(st_proj_a_pc_3)
def test_tech_proj_b_software(uc7_network):
"""Asserts that each host on the some_tech project A network has the expected services & applications which are operating as expected."""
network: Network = uc7_network
st_proj_b_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-1")
st_proj_b_pc_2: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-2")
st_proj_b_pc_3: Computer = network.get_node_by_hostname("ST-PROJ-B-PRV-PC-3")
# ST Project B - PC 1
assert_web_browser(st_proj_b_pc_1)
assert_database_client(st_proj_b_pc_1)
assert_dns_client(st_proj_b_pc_1)
assert_ntp_client(st_proj_b_pc_1)
# ST Project B - PC2
assert_web_browser(st_proj_b_pc_2)
assert_database_client(st_proj_b_pc_2)
assert_dns_client(st_proj_b_pc_2)
assert_ntp_client(st_proj_b_pc_2)
# ST Project B - PC3
assert_web_browser(st_proj_b_pc_3)
assert_database_client(st_proj_b_pc_3)
assert_dns_client(st_proj_b_pc_3)
assert_ntp_client(st_proj_b_pc_3)
def test_tech_proj_c_software(uc7_network):
"""Asserts that each host on the some_tech project A network has the expected services & applications which are operating as expected."""
network: Network = uc7_network
st_proj_c_pc_1: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-1")
st_proj_c_pc_2: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-2")
st_proj_c_pc_3: Computer = network.get_node_by_hostname("ST-PROJ-C-PRV-PC-3")
# ST Project C - PC 1
assert_web_browser(st_proj_c_pc_1)
assert_database_client(st_proj_c_pc_1)
assert_dns_client(st_proj_c_pc_1)
assert_ntp_client(st_proj_c_pc_1)
# ST Project C - PC2
assert_web_browser(st_proj_c_pc_2)
assert_database_client(st_proj_c_pc_2)
assert_dns_client(st_proj_c_pc_2)
assert_ntp_client(st_proj_c_pc_2)
# ST Project C - PC3
assert_web_browser(st_proj_c_pc_3)
assert_database_client(st_proj_c_pc_3)
assert_dns_client(st_proj_c_pc_3)
assert_ntp_client(st_proj_c_pc_3)

View File

@@ -0,0 +1 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK

View File

@@ -0,0 +1,143 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
def uc7_tap001_env() -> PrimaiteGymEnv:
with open(_EXAMPLE_CFG / "uc7_config.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
for a in cfg["agents"]:
if a["ref"] == "attacker":
tap_cfg = a
tap_cfg["agent_settings"]["start_step"] = 1
tap_cfg["agent_settings"]["frequency"] = 5
tap_cfg["agent_settings"]["variance"] = 0
env = PrimaiteGymEnv(env_config=cfg)
return env
def uc7_tap003_env(**kwargs) -> PrimaiteGymEnv:
"""Setups the UC7 TAP003 Game with the following settings:
start_step = Start on Step 1
frequency = Attack Every 5 Steps
Each PyTest will define the rest of the TAP & Kill Chain settings via **Kwargs
"""
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", "r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
for a in cfg["agents"]:
if a["ref"] == "attacker":
tap_cfg = a
tap_cfg["agent_settings"]["start_step"] = 1
tap_cfg["agent_settings"]["frequency"] = 5
tap_cfg["agent_settings"]["variance"] = 0
if "repeat_kill_chain" in kwargs:
tap_cfg["agent_settings"]["repeat_kill_chain"] = kwargs["repeat_kill_chain"]
if "repeat_kill_chain_stages" in kwargs:
tap_cfg["agent_settings"]["repeat_kill_chain_stages"] = kwargs["repeat_kill_chain_stages"]
if "planning_probability" in kwargs:
tap_cfg["agent_settings"]["kill_chain"]["PLANNING"]["probability"] = kwargs["planning_probability"]
if "custom_kill_chain" in kwargs:
tap_cfg["agent_settings"]["kill_chain"] = kwargs["custom_kill_chain"]
if "starting_nodes" in kwargs:
tap_cfg["agent_settings"]["starting_nodes"] = kwargs["starting_nodes"]
if "target_nodes" in kwargs:
tap_cfg["agent_settings"]["target_nodes"] = kwargs["target_nodes"]
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap001_setup():
"""Tests abstract TAP's following methods:
1. _setup_kill_chain
2. _setup_agent_kill_chain
3. _setup_tap_applications
"""
env = uc7_tap001_env() # Using TAP001 for PyTests.
tap: TAP001 = env.game.agents["attacker"]
# check the kill chain loaded correctly
assert tap.selected_kill_chain is MobileMalwareKillChain
assert tap.selected_kill_chain.FAILED == BaseKillChain.FAILED
assert tap.selected_kill_chain.SUCCEEDED == BaseKillChain.SUCCEEDED
assert tap.selected_kill_chain.NOT_STARTED == BaseKillChain.NOT_STARTED
if sn := tap.config.agent_settings.default_starting_node:
assert tap.starting_node == sn
else:
assert tap.starting_node in tap.config.agent_settings.starting_nodes
if ti := tap.config.agent_settings.default_target_ip:
assert tap.target_ip == ti
else:
assert tap.target_ip in tap.config.agent_settings.target_ips
assert tap.next_execution_timestep == tap.config.agent_settings.start_step
assert tap.current_host == tap.starting_node
def test_abstract_tap_select_start_node():
"""Tests that Abstract TAP's _select_start_node"""
env = uc7_tap003_env(repeat_kill_chain=True, repeat_kill_chain_stages=True) # Using TAP003 for PyTests.
tap: TAP003 = env.game.agents["attacker"]
assert tap.starting_node == "ST-PROJ-A-PRV-PC-1"
assert tap.current_host == tap.starting_node
def test_outcome_handler():
"""Tests Abstract Tap's outcome handler concludes the episode when the kill chain fails."""
env = uc7_tap003_env(repeat_kill_chain=False, repeat_kill_chain_stages=False) # Using TAP003 for PyTests.
tap: TAP003 = env.game.agents["attacker"]
tap.current_kill_chain_stage = BaseKillChain.FAILED
for _ in range(6):
env.step(0)
assert tap.actions_concluded == True
def test_abstract_tap_kill_chain_repeat():
"""Tests that the kill chain repeats from the beginning upon failure."""
env = uc7_tap003_env(repeat_kill_chain=True, repeat_kill_chain_stages=False) # Using TAP003 for PyTests.
tap: TAP003 = env.game.agents["attacker"]
for _ in range(15):
env.step(0) # Steps directly to the Access Stage
assert tap.current_kill_chain_stage == InsiderKillChain.ACCESS
tap.current_kill_chain_stage = BaseKillChain.FAILED
for _ in range(5):
env.step(0) # Steps to manipulation - but failure causes the kill chain to restart.
assert tap.actions_concluded == False
assert tap.current_kill_chain_stage == InsiderKillChain.RECONNAISSANCE
"""Tests that kill chain stages repeat when expected"""
env = uc7_tap003_env(
repeat_kill_chain=True, repeat_kill_chain_stages=True, planning_probability=0
) # Using TAP003 for PyTests.
tap: TAP003 = env.game.agents["attacker"]
tap.current_kill_chain_stage = InsiderKillChain.PLANNING
for _ in range(15):
env.step(0) # Attempts to progress past the PLANNING stage multiple times.
assert tap.actions_concluded == False
assert tap.current_kill_chain_stage == InsiderKillChain.PLANNING

View File

@@ -0,0 +1,69 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
# Defining constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
def uc7_tap003_env() -> PrimaiteGymEnv:
"""Setups the UC7 TAP003 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][32]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][32]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][32]["agent_settings"]["variance"] = VARIANCE
env = PrimaiteGymEnv(env_config=cfg)
return env
def uc7_tap001_env() -> PrimaiteGymEnv:
"""Setup the UC7 TAP001 Game with the start_step & frequency set to 1 & 2 respectively. Probabilities are set to 1"""
with open(_EXAMPLE_CFG / "uc7_config.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][32]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][32]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][32]["agent_settings"]["variance"] = VARIANCE
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap003_insider_kill_chain_load():
"""Tests that tap003's insider kill chain is successfully loaded into the tap.selected_kill_chain attribute."""
env = uc7_tap003_env() # Using TAP003 for PyTests.
tap: TAP003 = env.game.agents["attacker"]
# Asserting that the Base Kill Chain intEnum stages are loaded
assert BaseKillChain.FAILED in [enums for enums in tap.selected_kill_chain]
assert BaseKillChain.SUCCEEDED in [enums for enums in tap.selected_kill_chain]
assert BaseKillChain.NOT_STARTED in [enums for enums in tap.selected_kill_chain]
# Asserting that the Insider Kill Chain intenum stages are loaded.
assert len(InsiderKillChain.__members__) == len(tap.selected_kill_chain.__members__)
def test_tap001_mobile_malware_kill_chain_load():
"""Tests that tap001's mobile malware is successfully loaded into the tap.selected_kill_chain attribute."""
env = uc7_tap001_env() # Using TAP003 for PyTests.
tap: TAP001 = env.game.agents["attacker"]
# Asserting that the Base Kill Chain intEnum stages are loaded.
assert BaseKillChain.FAILED in [enums for enums in tap.selected_kill_chain]
assert BaseKillChain.SUCCEEDED in [enums for enums in tap.selected_kill_chain]
assert BaseKillChain.NOT_STARTED in [enums for enums in tap.selected_kill_chain]
# Asserting that the Insider Kill Chain intEnum stages are loaded.
assert len(MobileMalwareKillChain.__members__) == len(tap.selected_kill_chain.__members__)

View File

@@ -0,0 +1,109 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
# Defining constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
def uc7_tap001_env(**kwargs) -> PrimaiteGymEnv:
"""Setups the UC7 tap001 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][32]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][32]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][32]["agent_settings"]["variance"] = VARIANCE
cfg["agents"][32]["agent_settings"]["repeat_kill_chain"] = kwargs["repeat_kill_chain"]
cfg["agents"][32]["agent_settings"]["repeat_kill_chain_stages"] = kwargs["repeat_kill_chain_stages"]
cfg["agents"][32]["agent_settings"]["kill_chain"]["PROPAGATE"]["probability"] = kwargs["propagate_probability"]
cfg["agents"][32]["agent_settings"]["kill_chain"]["PAYLOAD"]["probability"] = kwargs["payload_probability"]
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap001_repeating_kill_chain():
"""Tests to check that tap001 repeats it's kill chain after success"""
env = uc7_tap001_env(
repeat_kill_chain=True,
repeat_kill_chain_stages=True,
payload_probability=1,
propagate_probability=1,
)
tap001: TAP001 = env.game.agents["attacker"]
# Looping for 50 timesteps - As the agent is set to execute an action every 2 timesteps
# This is the equivalent of the agent taking 20 actions.
for _ in range(50): # This for loop should never actually fully complete.
if tap001.current_kill_chain_stage == BaseKillChain.SUCCEEDED:
break
env.step(0)
# Catches if the above for loop fully completes.
# This test uses a probability of 1 for all stages and a variance of 2 timesteps
# Thus the for loop above should never fail.
# If this occurs then there is an error somewhere in either:
# 1. The TAP Logic
# 2. Failing Agent Actions are causing the TAP to fail. (See tap_return_handler).
if tap001.current_kill_chain_stage != BaseKillChain.SUCCEEDED:
pytest.fail("Attacker Never Reached SUCCEEDED - Please evaluate current TAP Logic.")
# Stepping twice for the succeeded logic to kick in:
env.step(0)
env.step(0)
env.step(0)
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.DOWNLOAD.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.INSTALL.name
def test_tap001_repeating_kill_chain_stages():
"""Tests to check that tap001 repeats it's kill chain after failing a kill chain stage."""
env = uc7_tap001_env(
repeat_kill_chain=True,
repeat_kill_chain_stages=True,
payload_probability=1,
propagate_probability=0,
# Probability 0 = Will never be able to perform the access stage and progress to Manipulation.
)
tap001: TAP001 = env.game.agents["attacker"]
env.step(0) # Skipping not started
env.step(0) # Successful on the first stage
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.DOWNLOAD.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.INSTALL.name
env.step(0) # Successful progression to the second stage
env.step(0)
env.step(0)
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.INSTALL.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.ACTIVATE.name
env.step(0) # Successful progression to the third stage
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.ACTIVATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
env.step(0) # Successful progression to the Fourth stage
env.step(0)
env.step(0)
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
env.step(0) # FAILURE -- Unsuccessful progression to the Fourth stage
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
assert tap001.current_stage_progress == KillChainStageProgress.PENDING

View File

@@ -0,0 +1,215 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.services.database.database_service import DatabaseService
# Defining constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
# Defining constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
REPEAT_KILL_CHAIN = False # Should the TAP repeat the kill chain after success/failure?
REPEAT_KILL_CHAIN_STAGES = False # Should the TAP restart from it's previous stage on failure?
KILL_CHAIN_PROBABILITY = 1 # Blank probability for agent 'success'
DATA_EXFIL = True # Data exfiltration on the payload stage is enabled.
def uc7_tap001_env() -> PrimaiteGymEnv:
"""Setups the UC7 tap001 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][32]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][32]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][32]["agent_settings"]["variance"] = VARIANCE
cfg["agents"][32]["agent_settings"]["repeat_kill_chain"] = REPEAT_KILL_CHAIN_STAGES
cfg["agents"][32]["agent_settings"]["repeat_kill_chain_stages"] = REPEAT_KILL_CHAIN_STAGES
cfg["agents"][32]["agent_settings"]["kill_chain"]["PAYLOAD"]["probability"] = KILL_CHAIN_PROBABILITY
cfg["agents"][32]["agent_settings"]["kill_chain"]["PROPAGATE"]["probability"] = KILL_CHAIN_PROBABILITY
cfg["agents"][32]["agent_settings"]["kill_chain"]["PAYLOAD"]["exfiltrate"] = DATA_EXFIL
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap001_kill_chain_stage_DOWNLOAD():
"""Tests that the DOWNLOAD Mobile Malware step works as expected and the expected impacts are made in the simulation."""
# Instantiating the relevant simulation/game objects:
env = uc7_tap001_env()
tap001: TAP001 = env.game.agents["attacker"]
starting_host = env.game.simulation.network.get_node_by_hostname(tap001.starting_node)
assert tap001.current_kill_chain_stage == BaseKillChain.NOT_STARTED
# Frequency is set to two steps
env.step(0)
env.step(0)
env.step(0)
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.DOWNLOAD.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.INSTALL.name
assert tap001.current_stage_progress == KillChainStageProgress.IN_PROGRESS
# Creating the "downloads" folder
env.step(0)
env.step(0)
assert starting_host.software_manager.file_system.get_folder(folder_name="downloads")
assert starting_host.software_manager.file_system.get_file(folder_name="downloads", file_name="malware_dropper.ps1")
# Testing that the num_file_increase works
assert starting_host.file_system.num_file_creations == 1
def test_tap001_kill_chain_stage_INSTALL():
"""Tests that the INSTALL Mobile Malware step works as expected and the expected impacts are made in the simulation."""
env = uc7_tap001_env()
tap001: TAP001 = env.game.agents["attacker"]
# The tap001's Starting Client:
starting_host = env.game.simulation.network.get_node_by_hostname(tap001.starting_node)
# Skipping directly to the activate stage
for _ in range(6):
env.step(0)
# Testing that tap001 Enters into the expected kill chain stages
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.INSTALL.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.ACTIVATE.name
env.step(0) # Allows the agent action to resolve.
env.step(0)
ransomware_dropper_file = starting_host.software_manager.file_system.get_file(
folder_name="downloads", file_name="malware_dropper.ps1"
)
assert ransomware_dropper_file.num_access == 1
def test_tap001_kill_chain_stage_ACTIVATE():
"""Tests that the ACTIVATE Mobile Malware step works as expected and the current impacts are made in the simulation."""
env = uc7_tap001_env()
tap001: TAP001 = env.game.agents["attacker"]
# The tap001's Starting Client:
starting_host = env.game.simulation.network.get_node_by_hostname(tap001.starting_node)
# Skipping directly to the activate stage
for _ in range(8):
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.ACTIVATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
# Installing ransomware-script Application
env.step(0)
env.step(0)
# Installing NMAP Application
env.step(0)
env.step(0)
# These asserts will fail if the applications are not present in the software_manager
assert starting_host.software_manager.software["ransomware-script"]
assert starting_host.software_manager.software["nmap"]
def test_tap001_kill_chain_stage_PROPAGATE():
"""Tests that the ACTIVATE Mobile Malware step works as expected and the current impacts are made in the simulation."""
env = uc7_tap001_env()
tap001: TAP001 = env.game.agents["attacker"]
for _ in range(12):
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
# Specific Stage by Stage Propagate Testing is done in test_tap001_propagate.
fail_safe_var = 0
while tap001.current_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE:
env.step(0)
assert tap001.current_stage_progress == KillChainStageProgress.IN_PROGRESS
fail_safe_var += 1
if fail_safe_var == 100:
pytest.fail("Fail Safe Variable was hit! -- Propagate step is running indefinitely")
def test_tap001_kill_chain_stage_COMMAND_AND_CONTROL():
"""Tests that the Command And Control Mobile Malware step works as expected and the current impacts are made in the simulation."""
env = uc7_tap001_env()
tap001: TAP001 = env.game.agents["attacker"]
fail_safe_var = 0
for _ in range(28):
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.PAYLOAD.name
while tap001.current_kill_chain_stage == MobileMalwareKillChain.COMMAND_AND_CONTROL:
env.step(0)
fail_safe_var += 1
env.game.simulation.network.airspace.show()
if fail_safe_var == 100:
pytest.fail(reason="Fail Safe Variable was hit! -- Propagate step is running indefinitely")
starting_host = env.game.simulation.network.get_node_by_hostname(tap001.starting_node)
c2_beacon: C2Beacon = starting_host.software_manager.software["c2-beacon"]
assert c2_beacon.c2_connection_active is True
def test_tap001_kill_chain_stage_PAYLOAD():
"""Tests that the PAYLOAD Mobile Malware step works as expected and the current impacts are made in the simulation."""
env = uc7_tap001_env()
tap001: TAP001 = env.game.agents["attacker"]
# The tap001's Target Database
target_host = env.game.simulation.network.get_node_by_hostname("ST-DATA-PRV-SRV-DB")
db_server_service: DatabaseService = target_host.software_manager.software.get("database-service")
# Green agent status requests are tested within the ransomware application tests.
# See test_ransomware_disrupts_green_agent_connection for further reference.
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.GOOD
fail_safe_var = 0
while tap001.current_kill_chain_stage != MobileMalwareKillChain.PAYLOAD:
env.step(0)
fail_safe_var += 1
if fail_safe_var == 100:
pytest.fail(reason="Fail Safe Variable was hit! -- a step is running indefinitely")
for _ in range(12):
env.step(0)
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
# Asserting we've managed to the database.db file onto the starting node & server
starting_host = env.game.simulation.network.get_node_by_hostname(tap001.starting_node)
c2_host = env.game.simulation.network.get_node_by_hostname(tap001.c2_settings["c2_server"])
assert starting_host.file_system.access_file(folder_name="exfiltration_folder", file_name="database.db")
assert c2_host.file_system.access_file(folder_name="exfiltration_folder", file_name="database.db")

View File

@@ -0,0 +1,140 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.services.database.database_service import DatabaseService
# Defining generic tap constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
REPEAT_KILL_CHAIN = False # Should the TAP repeat the kill chain after success/failure?
REPEAT_KILL_CHAIN_STAGES = False # Should the TAP restart from it's previous stage on failure?
KILL_CHAIN_PROBABILITY = 1 # Blank probability for agent 'success'
def uc7_tap001_env(**kwargs) -> PrimaiteGymEnv:
"""Setups the UC7 tap001 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
agent_cfg = cfg["agents"][32]["agent_settings"]
agent_cfg["start_step"] = START_STEP
agent_cfg["frequency"] = FREQUENCY
agent_cfg["variance"] = VARIANCE
agent_cfg["repeat_kill_chain"] = REPEAT_KILL_CHAIN_STAGES
agent_cfg["repeat_kill_chain_stages"] = REPEAT_KILL_CHAIN_STAGES
agent_cfg["kill_chain"]["PAYLOAD"]["probability"] = KILL_CHAIN_PROBABILITY
agent_cfg["kill_chain"]["PROPAGATE"]["probability"] = KILL_CHAIN_PROBABILITY
agent_cfg["kill_chain"]["PROPAGATE"]["scan_attempts"] = kwargs["scan_attempts"]
agent_cfg["kill_chain"]["PAYLOAD"]["payload"] = kwargs["payload"]
agent_cfg["kill_chain"]["PROPAGATE"]["network_addresses"] = kwargs["network_addresses"]
if "repeat_scan" in kwargs:
agent_cfg["kill_chain"]["PROPAGATE"]["repeat_scan"] = kwargs["repeat_scan"]
if "starting_nodes" in kwargs:
agent_cfg["starting_nodes"] = kwargs["starting_nodes"]
agent_cfg["default_starting_node"] = kwargs["starting_nodes"][0]
if "target_ips" in kwargs:
agent_cfg["target_ips"] = kwargs["target_ips"]
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap001_kill_chain_stage_PROPAGATE_default():
"""Tests that the PROPAGATE Mobile Malware step works as expected and the current impacts are made in the simulation."""
payload = "ENCRYPT"
scan_attempts = 10
network_addresses = [
"192.168.230.0/29",
"192.168.10.0/26",
"192.168.20.0/30",
"192.168.240.0/29",
"192.168.220.0/29",
]
env = uc7_tap001_env(payload=payload, scan_attempts=scan_attempts, network_addresses=network_addresses)
tap001: TAP001 = env.game.agents["attacker"]
# First Kill Chain Stages
for _ in range(12):
env.step(0)
# Assert that we're about to enter into the propagate stage.
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
# Move into the propagate stage.
while tap001.current_kill_chain_stage == MobileMalwareKillChain.PROPAGATE:
env.step(0)
# Assert that we've successfully moved into the command and control stage.
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.PAYLOAD.name
def test_tap001_kill_chain_stage_PROPAGATE_different_starting_node():
"""Tests that the PROPAGATE Mobile Malware step works as expected and the current impacts are made in the simulation from a different starting node."""
payload = "ENCRYPT"
scan_attempts = 10
network_addresses = [
"192.168.230.0/29",
"192.168.10.0/26",
"192.168.20.0/30",
"192.168.240.0/29",
"192.168.220.0/29",
]
starting_nodes = ["ST-PROJ-B-PRV-PC-2", "ST-PROJ-C-PRV-PC-3"]
env = uc7_tap001_env(
payload=payload, scan_attempts=scan_attempts, network_addresses=network_addresses, starting_nodes=starting_nodes
)
tap001: TAP001 = env.game.agents["attacker"]
for _ in range(12):
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.PROPAGATE.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
# Specific Stage by Stage Propagate Testing is done in test_tap001_propagate.
while tap001.current_kill_chain_stage == MobileMalwareKillChain.PROPAGATE:
env.step(0)
assert tap001.current_kill_chain_stage.name == MobileMalwareKillChain.COMMAND_AND_CONTROL.name
assert tap001.next_kill_chain_stage.name == MobileMalwareKillChain.PAYLOAD.name
def test_tap001_kill_chain_stage_PROPAGATE_repeat_scan():
"""Tests that the PROPAGATE Mobile Malware step will fail when the target is unable to be located."""
payload = "ENCRYPT"
scan_attempts = 20
repeat_scan = True
network_addresses = ["192.168.1.0/24", "192.168.0.0/28", "100.64.0.0/30", "172.168.0.0/28"]
env = uc7_tap001_env(
payload=payload, scan_attempts=scan_attempts, network_addresses=network_addresses, repeat_scan=repeat_scan
)
for _ in range(12):
env.step(0)
tap001: TAP001 = env.game.agents["attacker"]
while tap001.current_kill_chain_stage == MobileMalwareKillChain.PROPAGATE:
env.step(0)
# As the given network_address does not contain the target, we should failed because the maximum amount of scan attempts has been reached
assert tap001.scans_complete == 20
assert tap001.current_kill_chain_stage == MobileMalwareKillChain.FAILED

View File

@@ -0,0 +1,101 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
# Defining constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
def uc7_tap003_env(**kwargs) -> PrimaiteGymEnv:
"""Setups the UC7 TAP003 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][32]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][32]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][32]["agent_settings"]["variance"] = VARIANCE
cfg["agents"][32]["agent_settings"]["repeat_kill_chain"] = kwargs["repeat_kill_chain"]
cfg["agents"][32]["agent_settings"]["repeat_kill_chain_stages"] = kwargs["repeat_kill_chain_stages"]
cfg["agents"][32]["agent_settings"]["kill_chain"]["MANIPULATION"]["probability"] = kwargs[
"manipulation_probability"
]
cfg["agents"][32]["agent_settings"]["kill_chain"]["ACCESS"]["probability"] = kwargs["access_probability"]
cfg["agents"][32]["agent_settings"]["kill_chain"]["PLANNING"]["probability"] = kwargs["planning_probability"]
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap003_repeating_kill_chain():
"""Tests to check that TAP003 repeats it's kill chain after success"""
env = uc7_tap003_env(
repeat_kill_chain=True,
repeat_kill_chain_stages=True,
manipulation_probability=1,
access_probability=1,
planning_probability=1,
)
tap003: TAP003 = env.game.agents["attacker"]
for _ in range(40): # This for loop should never actually fully complete.
if tap003.current_kill_chain_stage == BaseKillChain.SUCCEEDED:
break
env.step(0)
# Catches if the above for loop fully completes.
# This test uses a probability of 1 for all stages and a variance of 2 timesteps
# Thus the for loop above should never fail.
# If this occurs then there is an error somewhere in either:
# 1. The TAP Logic
# 2. Failing Agent Actions are causing the TAP to fail. (See tap_return_handler).
if tap003.current_kill_chain_stage != BaseKillChain.SUCCEEDED:
pytest.fail("Attacker Never Reached SUCCEEDED - Please evaluate current TAP Logic.")
# Stepping twice for the succeeded logic to kick in:
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name
def test_tap003_repeating_kill_chain_stages():
"""Tests to check that TAP003 repeats it's kill chain after failing a kill chain stage."""
env = uc7_tap003_env(
repeat_kill_chain=True,
repeat_kill_chain_stages=True,
manipulation_probability=1,
# Probability 0 = Will never be able to perform the access stage and progress to Manipulation.
access_probability=0,
planning_probability=1,
)
tap003: TAP003 = env.game.agents["attacker"]
env.step(0) # Skipping not started
env.step(0) # Successful on the first stage
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name
env.step(0) # Successful progression to the second stage
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name
env.step(0) # Successfully moved onto access.
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name
env.step(0) # Failure to progress past the third stage.
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name

View File

@@ -0,0 +1,232 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
# Defining constants.
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
REPEAT_KILL_CHAIN = False # Should the TAP repeat the kill chain after success/failure?
REPEAT_KILL_CHAIN_STAGES = False # Should the TAP restart from it's previous stage on failure?
KILL_CHAIN_PROBABILITY = 1 # Blank probability for agent 'success'
def uc7_tap003_env() -> PrimaiteGymEnv:
"""Setups the UC7 TAP003 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][32]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][32]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][32]["agent_settings"]["variance"] = VARIANCE
cfg["agents"][32]["agent_settings"]["repeat_kill_chain"] = REPEAT_KILL_CHAIN_STAGES
cfg["agents"][32]["agent_settings"]["repeat_kill_chain_stages"] = REPEAT_KILL_CHAIN_STAGES
cfg["agents"][32]["agent_settings"]["kill_chain"]["MANIPULATION"]["probability"] = KILL_CHAIN_PROBABILITY
cfg["agents"][32]["agent_settings"]["kill_chain"]["ACCESS"]["probability"] = KILL_CHAIN_PROBABILITY
cfg["agents"][32]["agent_settings"]["kill_chain"]["PLANNING"]["probability"] = KILL_CHAIN_PROBABILITY
cfg["agents"][32]["agent_settings"]["kill_chain"]["EXPLOIT"]["probability"] = KILL_CHAIN_PROBABILITY
env = PrimaiteGymEnv(env_config=cfg)
return env
def environment_step(i: int, env: PrimaiteGymEnv) -> PrimaiteGymEnv:
"""Carries out i (given parameter) steps in the environment.."""
for x in range(i):
env.step(0)
return env
def test_tap003_kill_chain_stage_reconnaissance():
"""Tests the successful/failed handlers in the reconnaissance stage in the Insider Kill Chain InsiderKillChain"""
# Instantiating the relevant simulation/game objects:
env = uc7_tap003_env()
tap003: TAP003 = env.game.agents["attacker"]
assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED
# Frequency is set to two steps
env = environment_step(i=2, env=env)
# Testing that TAP003 Enters into the expected kill chain stages
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
def test_tap003_kill_chain_stage_planning():
"""Tests the successful/failed handlers in the planning stage in the Insider Kill Chain (TAP003)"""
env = uc7_tap003_env()
tap003: TAP003 = env.game.agents["attacker"]
assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED
env = environment_step(i=2, env=env)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name
env = environment_step(i=2, env=env)
# Testing that TAP003 Enters into the expected kill chain stages
assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name
env = environment_step(i=2, env=env)
# Testing that the stage successfully impacted the simulation - User is logged in
# TODO: Add an assert for this.
def test_tap003_kill_chain_stage_access():
"""Tests the successful/failed handlers in the access stage in the InsiderKillChain"""
env = uc7_tap003_env()
tap003: TAP003 = env.game.agents["attacker"]
assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED
env = environment_step(i=2, env=env)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name
env = environment_step(i=2, env=env)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name
env = environment_step(i=2, env=env)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name
env = environment_step(i=2, env=env)
def test_tap003_kill_chain_stage_manipulation():
"""Tests the successful/failed handlers in the manipulation stage in the InsiderKillChain"""
env = uc7_tap003_env()
env.reset()
tap003: TAP003 = env.game.agents["attacker"]
assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name
# Testing that the stage successfully impacted the simulation - Accounts Altered
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
st_intra_prv_rt_dr_1: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-DR-1")
assert st_intra_prv_rt_dr_1.user_manager.admins["admin"].password == "red_pass"
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
st_intra_prv_rt_cr: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-CR")
assert st_intra_prv_rt_cr.user_manager.admins["admin"].password == "red_pass"
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
rem_pub_rt_dr: Router = env.game.simulation.network.get_node_by_hostname("REM-PUB-RT-DR")
assert rem_pub_rt_dr.user_manager.admins["admin"].password == "red_pass"
def test_tap003_kill_chain_stage_exploit():
"""Tests the successful/failed handlers in the exploit stage in the InsiderKillChain"""
env = uc7_tap003_env()
tap003: TAP003 = env.game.agents["attacker"]
# The TAP003's Target Router/Firewall
st_intra_prv_rt_dr_1: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-DR-1")
st_intra_prv_rt_cr: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-CR")
rem_pub_rt_dr: Router = env.game.simulation.network.get_node_by_hostname("REM-PUB-RT-DR")
assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name
assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
env.step(0)
assert tap003.current_kill_chain_stage.name == InsiderKillChain.EXPLOIT.name
# Testing that the stage successfully impacted the simulation - Malicious ACL Added:
for _ in range(32):
env.step(0)
# Tests that the ACL has been added and that the action is deny.
st_intra_prv_rt_dr_1_acl_list = st_intra_prv_rt_dr_1.acl
assert st_intra_prv_rt_dr_1_acl_list.acl[1].action != None
assert st_intra_prv_rt_dr_1_acl_list.acl[1].action == ACLAction.DENY
st_intra_prv_rt_cr_acl_list = st_intra_prv_rt_cr.acl
assert st_intra_prv_rt_cr_acl_list.acl[1].action != None
assert st_intra_prv_rt_cr_acl_list.acl[1].action == ACLAction.DENY
rem_pub_rt_dr_acl_list = rem_pub_rt_dr.acl
assert rem_pub_rt_dr_acl_list.acl[1].action != None
assert rem_pub_rt_dr_acl_list.acl[1].action == ACLAction.DENY

View File

@@ -0,0 +1,201 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from typing import Protocol
import pytest
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.abstract_tap import (
AbstractTAP,
BaseKillChain,
KillChainOptions,
KillChainStageOptions,
KillChainStageProgress,
)
from primaite.game.agent.scripted_agents.TAP001 import MobileMalwareKillChain
from primaite.game.agent.scripted_agents.TAP003 import InsiderKillChain
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import PORT_LOOKUP
# Defining constants.
ATTACK_AGENT_INDEX = 32
START_STEP = 1 # The starting step of the agent.
FREQUENCY = 2 # The frequency of kill chain stage progression (E.g it's next attempt at "attacking").
VARIANCE = 0 # The timestep variance between kill chain progression (E.g Next timestep = Frequency +/- variance)
REPEAT_KILL_CHAIN = False # Should the TAP repeat the kill chain after success/failure?
REPEAT_KILL_CHAIN_STAGES = False # Should the TAP restart from it's previous stage on failure?
KILL_CHAIN_PROBABILITY = 1 # Blank probability for agent 'success'
RULES = [
{
"target_router": "ST-INTRA-PRV-RT-DR-1",
"position": 1,
"permission": "DENY",
"src_ip": "192.168.220.3",
"src_wildcard": "NONE",
"dst_ip": "192.168.220.3",
"dst_wildcard": "NONE",
"src_port": "ALL",
"dst_port": "ALL",
"protocol_name": "ALL",
},
{
"target_router": "ST-INTRA-PRV-RT-DR-2",
"position": 5,
"permission": "DENY",
"src_ip": "192.168.220.3",
"src_wildcard": "NONE",
"dst_ip": "ALL",
"dst_wildcard": "NONE",
"src_port": "ALL",
"dst_port": "ALL",
"protocol_name": "ALL",
},
{
"target_router": "ST-INTRA-PRV-RT-CR",
"position": 6,
"permission": "PERMIT",
"src_ip": "192.168.220.3",
"src_wildcard": "NONE",
"dst_ip": "ALL",
"dst_wildcard": "NONE",
"src_port": "ALL",
"dst_port": "ALL",
"protocol_name": "ALL",
},
{
"target_router": "REM-PUB-RT-DR",
"position": 3,
"permission": "PERMIT",
"src_ip": "192.168.220.3",
"src_wildcard": "0.0.0.1",
"dst_ip": "192.168.220.3",
"dst_wildcard": "0.0.0.1",
"src_port": "FTP",
"dst_port": "FTP",
"protocol_name": "TCP",
},
#
]
def uc7_tap003_env(**kwargs) -> PrimaiteGymEnv:
"""Setups the UC7 TAP003 Game with the start_step & frequency set to 1 with probabilities set to 1 as well"""
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["io_settings"]["save_sys_logs"] = False
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["start_step"] = START_STEP
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["frequency"] = FREQUENCY
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["variance"] = VARIANCE
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["repeat_kill_chain"] = kwargs["repeat_kill_chain"]
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["repeat_kill_chain_stages"] = kwargs[
"repeat_kill_chain_stages"
]
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["kill_chain"]["MANIPULATION"]["probability"] = kwargs[
"manipulation_probability"
]
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["kill_chain"]["ACCESS"]["probability"] = kwargs[
"access_probability"
]
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["kill_chain"]["PLANNING"]["probability"] = kwargs[
"planning_probability"
]
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["kill_chain"]["EXPLOIT"]["malicious_acls"] = RULES
# Adding the new test target to TAP003's starting knowledge:
new_target_dict = {
"ST-INTRA-PRV-RT-DR-2": {
"ip_address": "192.168.170.2",
"username": "admin",
"password": "admin",
}
}
new_target_manipulation = {
"host": "ST-INTRA-PRV-RT-DR-2",
"ip_address": "192.168.170.2",
"action": "change_password",
"username": "admin",
"new_password": "red_pass",
}
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["kill_chain"]["PLANNING"]["starting_network_knowledge"][
"credentials"
].update(new_target_dict)
cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["kill_chain"]["MANIPULATION"]["account_changes"].append(
new_target_manipulation
)
env = PrimaiteGymEnv(env_config=cfg)
return env
def test_tap003_cycling_rules():
"""Tests to check that TAP003 repeats it's kill chain after success"""
env = uc7_tap003_env(
repeat_kill_chain=True,
repeat_kill_chain_stages=True,
manipulation_probability=1,
access_probability=1,
planning_probability=1,
)
tap003: TAP003 = env.game.agents["attacker"]
def wait_until_attack():
for _ in range(120):
# check if the agent has executed and therefore moved onto the next rule index
env.step(0)
if tap003.history[-1].action == "node-send-remote-command":
if tap003.history[-1].parameters["command"][0] == "acl":
return
pytest.fail("While testing the cycling of TAP003 rules, the agent unexpectedly didn't execute its attack.")
wait_until_attack()
target_node: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-DR-1")
assert (rule_0 := target_node.acl.acl[1]) is not None
assert rule_0.action == ACLAction.DENY
assert rule_0.protocol == None
assert rule_0.src_ip_address == IPV4Address("192.168.220.3")
assert rule_0.src_wildcard_mask == None
assert rule_0.dst_ip_address == IPV4Address("192.168.220.3")
assert rule_0.dst_wildcard_mask == None
assert rule_0.src_port == None
assert rule_0.dst_port == None
target_node: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-DR-2")
wait_until_attack()
assert (rule_1 := target_node.acl.acl[5]) is not None
assert rule_1.action == ACLAction.DENY
assert rule_1.protocol == None
assert rule_1.src_ip_address == IPV4Address("192.168.220.3")
assert rule_1.src_wildcard_mask == None
assert rule_1.dst_ip_address == None
assert rule_1.dst_wildcard_mask == None
assert rule_1.src_port == None
assert rule_1.dst_port == None
wait_until_attack()
target_node: Router = env.game.simulation.network.get_node_by_hostname("ST-INTRA-PRV-RT-CR")
assert (rule_2 := target_node.acl.acl[6]) is not None
assert rule_2.action == ACLAction.PERMIT
assert rule_2.protocol == None
assert rule_2.src_ip_address == IPV4Address("192.168.220.3")
assert rule_2.src_wildcard_mask == None # default
assert rule_2.dst_ip_address == None
assert rule_2.dst_wildcard_mask == None # default
assert rule_2.src_port == None
assert rule_2.dst_port == None
wait_until_attack()
target_node: Router = env.game.simulation.network.get_node_by_hostname("REM-PUB-RT-DR")
assert (rule_3 := target_node.acl.acl[3]) is not None
assert rule_3.action == ACLAction.PERMIT
assert rule_3.protocol == PROTOCOL_LOOKUP["TCP"]
assert rule_3.src_ip_address == IPV4Address("192.168.220.3")
assert rule_3.src_wildcard_mask == IPV4Address("0.0.0.1")
assert rule_3.dst_ip_address == IPV4Address("192.168.220.3")
assert rule_3.dst_wildcard_mask == IPV4Address("0.0.0.1")
assert rule_3.src_port == PORT_LOOKUP["FTP"]
assert rule_3.dst_port == PORT_LOOKUP["FTP"]
# If we've gotten this fair then we can pass the test :)
pass

View File

@@ -8,7 +8,7 @@ from primaite.config.load import data_manipulation_config_path
from primaite.game.game import PrimaiteGame
from tests import TEST_ASSETS_ROOT
BASIC_CONFIG = TEST_ASSETS_ROOT / "configs/basic_switched_network.yaml"
BASIC_SWITCHED_NETWORK_CONFIG = TEST_ASSETS_ROOT / "configs/basic_switched_network.yaml"
def load_config(config_path: Union[str, Path]) -> PrimaiteGame:
@@ -24,3 +24,42 @@ def test_thresholds():
game = load_config(data_manipulation_config_path())
assert game.options.thresholds is not None
def test_nmne_threshold():
"""Test that the NMNE thresholds are properly loaded in by observation."""
game = load_config(BASIC_SWITCHED_NETWORK_CONFIG)
assert game.options.thresholds["nmne"] is not None
# get NIC observation
nic_obs = game.agents["defender"].observation_manager.obs.components["NODES"].hosts[0].nics[0]
assert nic_obs.low_nmne_threshold == 5
assert nic_obs.med_nmne_threshold == 25
assert nic_obs.high_nmne_threshold == 100
def test_file_access_threshold():
"""Test that the NMNE thresholds are properly loaded in by observation."""
game = load_config(BASIC_SWITCHED_NETWORK_CONFIG)
assert game.options.thresholds["file_access"] is not None
# get file observation
file_obs = game.agents["defender"].observation_manager.obs.components["NODES"].hosts[0].folders[0].files[0]
assert file_obs.low_file_access_threshold == 2
assert file_obs.med_file_access_threshold == 5
assert file_obs.high_file_access_threshold == 10
def test_app_executions_threshold():
"""Test that the NMNE thresholds are properly loaded in by observation."""
game = load_config(BASIC_SWITCHED_NETWORK_CONFIG)
assert game.options.thresholds["app_executions"] is not None
# get application observation
app_obs = game.agents["defender"].observation_manager.obs.components["NODES"].hosts[0].applications[0]
assert app_obs.low_app_execution_threshold == 2
assert app_obs.med_app_execution_threshold == 3
assert app_obs.high_app_execution_threshold == 5

View File

@@ -0,0 +1,64 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from pathlib import Path
from typing import Union
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_type import FileType
from tests import TEST_ASSETS_ROOT
BASIC_CONFIG = TEST_ASSETS_ROOT / "configs/nodes_with_initial_files.yaml"
def load_config(config_path: Union[str, Path]) -> PrimaiteGame:
"""Returns a PrimaiteGame object which loads the contents of a given yaml path."""
with open(config_path, "r") as f:
cfg = yaml.safe_load(f)
return PrimaiteGame.from_config(cfg)
def test_node_file_system_from_config():
"""Test that the appropriate files are instantiated in nodes when loaded from config."""
game = load_config(BASIC_CONFIG)
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert client_1.software_manager.software.get("database-service") # database service should be installed
assert client_1.file_system.get_file(folder_name="database", file_name="database.db") # database files should exist
assert client_1.software_manager.software.get("web-server") # web server should be installed
assert client_1.file_system.get_file(folder_name="primaite", file_name="index.html") # web files should exist
client_2 = game.simulation.network.get_node_by_hostname("client_2")
# database service should not be installed
assert client_2.software_manager.software.get("database-service") is None
# database files should not exist
assert client_2.file_system.get_file(folder_name="database", file_name="database.db") is None
# web server should not be installed
assert client_2.software_manager.software.get("web-server") is None
# web files should not exist
assert client_2.file_system.get_file(folder_name="primaite", file_name="index.html") is None
empty_folder = client_2.file_system.get_folder(folder_name="empty_folder")
assert empty_folder
assert len(empty_folder.files) == 0 # should have no files
password_file = client_2.file_system.get_file(folder_name="root", file_name="passwords.txt")
assert password_file # should exist
assert password_file.file_type is FileType.TXT
assert password_file.size == 663
downloads_folder = client_2.file_system.get_folder(folder_name="downloads")
assert downloads_folder # downloads folder should exist
test_txt = downloads_folder.get_file(file_name="test.txt")
assert test_txt # test.txt should exist
assert test_txt.file_type is FileType.TXT
unknown_file_type = downloads_folder.get_file(file_name="another_file.pwtwoti")
assert unknown_file_type # unknown_file_type should exist
assert unknown_file_type.file_type is FileType.UNKNOWN

View File

@@ -0,0 +1 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK

View File

@@ -0,0 +1,23 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from pathlib import Path
from typing import Union
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.TAP003 import TAP003
from primaite.game.game import PrimaiteGame
def test_tap003_kill_chain_settings_load_config():
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
cfg["agents"][32]["agent_settings"]["kill_chain"]["MANIPULATION"]["probability"] = 0.5
cfg["agents"][32]["agent_settings"]["kill_chain"]["ACCESS"]["probability"] = 0.5
cfg["agents"][32]["agent_settings"]["kill_chain"]["PLANNING"]["probability"] = 0.5
game = PrimaiteGame.from_config(cfg)
tap: TAP003 = game.agents["attacker"]
kill_chain = tap.config.agent_settings.kill_chain
assert kill_chain.MANIPULATION.probability == 0.5
assert kill_chain.ACCESS.probability == 0.5
assert kill_chain.PLANNING.probability == 0.5

View File

@@ -0,0 +1,35 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from pathlib import Path
from typing import Union
import yaml
from primaite.config.load import _EXAMPLE_CFG
from primaite.game.agent.scripted_agents.TAP003 import TAP003
from primaite.game.game import PrimaiteGame
def test_threat_actor_profile_load_config():
"""Test to check that threat actor profiles are able to be loaded."""
with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config:
cfg = yaml.safe_load(uc7_config)
game = PrimaiteGame.from_config(cfg)
# tap003 is found and loaded TODO: Once tuple digestion is implemented, change to hardcoded 'tap003' test.
assert "attacker" in game.agents
assert isinstance(game.agents["attacker"], TAP003)
agent: TAP003 = game.agents["attacker"]
assert agent.config.agent_settings.start_step == 1
assert agent.config.agent_settings.frequency == 3
assert agent.config.agent_settings.variance == 0
assert not agent.config.agent_settings.repeat_kill_chain
assert agent.config.agent_settings.repeat_kill_chain_stages
assert agent.config.agent_settings.default_starting_node == "ST-PROJ-A-PRV-PC-1"
assert not agent.config.agent_settings.starting_nodes
assert agent.config.agent_settings.kill_chain.PLANNING.probability == 1
assert len(agent.config.agent_settings.kill_chain.PLANNING.starting_network_knowledge["credentials"]) == 6
assert agent.config.agent_settings.kill_chain.ACCESS.probability == 1
assert agent.config.agent_settings.kill_chain.MANIPULATION.probability == 1
assert len(agent.config.agent_settings.kill_chain.MANIPULATION.account_changes) == 3
assert agent.config.agent_settings.kill_chain.EXPLOIT.probability == 1
assert len(agent.config.agent_settings.kill_chain.EXPLOIT.malicious_acls) == 3

View File

@@ -49,7 +49,7 @@ class GigaSwitch(NetworkNode, discriminator="gigaswitch"):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Switch Ports"
table.title = f"{self.config.hostname} Switch Ports"
for port_num, port in self.network_interface.items():
table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"])
print(table)

View File

@@ -106,7 +106,6 @@ def test_remote_login_change_password(game_and_agent_fixture: Tuple[PrimaiteGame
"username": "user123",
"current_password": "password",
"new_password": "different_password",
"remote_ip": str(server_1.network_interface[1].ip_address),
},
)
agent.store_action(action)
@@ -146,7 +145,6 @@ def test_change_password_logs_out_user(game_and_agent_fixture: Tuple[PrimaiteGam
"username": "user123",
"current_password": "password",
"new_password": "different_password",
"remote_ip": str(server_1.network_interface[1].ip_address),
},
)
agent.store_action(action)
@@ -166,3 +164,55 @@ def test_change_password_logs_out_user(game_and_agent_fixture: Tuple[PrimaiteGam
assert server_1.file_system.get_folder("folder123") is None
assert server_1.file_system.get_file("folder123", "doggo.pdf") is None
def test_local_terminal(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
# create a new user account on server_1 that will be logged into remotely
client_1_usm: UserManager = client_1.software_manager.software["user-manager"]
client_1_usm.add_user("user123", "password", is_admin=True)
action = (
"node-send-local-command",
{
"node_name": "client_1",
"username": "user123",
"password": "password",
"command": ["file_system", "create", "file", "folder123", "doggo.pdf", False],
},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_folder("folder123")
assert client_1.file_system.get_file("folder123", "doggo.pdf")
# Change password
action = (
"node-account-change-password",
{
"node_name": "client_1",
"username": "user123",
"current_password": "password",
"new_password": "different_password",
},
)
agent.store_action(action)
game.step()
action = (
"node-send-local-command",
{
"node_name": "client_1",
"username": "user123",
"password": "password",
"command": ["file_system", "create", "file", "folder123", "cat.pdf", False],
},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_file("folder123", "cat.pdf") is None
client_1.session_manager.show()

View File

@@ -0,0 +1,176 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import pytest
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.utils.validation.port import Port, PORT_LOOKUP
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_user_account_add_user_action(game_and_agent_fixture):
"""Tests the add user account action."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert len(client_1.user_manager.users) == 1 # admin is created by default
assert len(client_1.user_manager.admins) == 1
# add admin account
action = (
"node-account-add-user",
{"node_name": "client_1", "username": "admin_2", "password": "e-tronic-boogaloo", "is_admin": True},
)
agent.store_action(action)
game.step()
assert len(client_1.user_manager.users) == 2 # new user added
assert len(client_1.user_manager.admins) == 2
# add non admin account
action = (
"node-account-add-user",
{"node_name": "client_1", "username": "leeroy.jenkins", "password": "no_plan_needed", "is_admin": False},
)
agent.store_action(action)
game.step()
assert len(client_1.user_manager.users) == 3 # new user added
assert len(client_1.user_manager.admins) == 2
def test_user_account_disable_user_action(game_and_agent_fixture):
"""Tests the disable user account action."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.user_manager.add_user(username="test", password="password", is_admin=True)
assert len(client_1.user_manager.users) == 2 # new user added
assert len(client_1.user_manager.admins) == 2
test_user = client_1.user_manager.users.get("test")
assert test_user
assert test_user.disabled is not True
# disable test account
action = (
"node-account-disable-user",
{
"node_name": "client_1",
"username": "test",
},
)
agent.store_action(action)
game.step()
assert test_user.disabled
def test_user_account_change_password_action(game_and_agent_fixture):
"""Tests the change password user account action."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
client_1.user_manager.add_user(username="test", password="password", is_admin=True)
test_user = client_1.user_manager.users.get("test")
assert test_user.password == "password"
# change account password
action = (
"node-account-change-password",
{"node_name": "client_1", "username": "test", "current_password": "password", "new_password": "2Hard_2_Hack"},
)
agent.store_action(action)
game.step()
assert test_user.password == "2Hard_2_Hack"
def test_user_account_create_terminal_action(game_and_agent_fixture):
"""Tests that agents can use the terminal to create new users."""
game, agent = game_and_agent_fixture
router = game.simulation.network.get_node_by_hostname("router")
router.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["SSH"], dst_port=PORT_LOOKUP["SSH"], position=4)
server_1 = game.simulation.network.get_node_by_hostname("server_1")
server_1_usm = server_1.software_manager.software["user-manager"]
server_1_usm.add_user("user123", "password", is_admin=True)
action = (
"node-session-remote-login",
{
"node_name": "client_1",
"username": "user123",
"password": "password",
"remote_ip": str(server_1.network_interface[1].ip_address),
},
)
agent.store_action(action)
game.step()
assert agent.history[-1].response.status == "success"
# Create a new user account via terminal.
action = (
"node-send-remote-command",
{
"node_name": "client_1",
"remote_ip": str(server_1.network_interface[1].ip_address),
"command": ["service", "user-manager", "add_user", "new_user", "new_pass", True],
},
)
agent.store_action(action)
game.step()
new_user = server_1.user_manager.users.get("new_user")
assert new_user
assert new_user.password == "new_pass"
assert new_user.disabled is not True
def test_user_account_disable_terminal_action(game_and_agent_fixture):
"""Tests that agents can use the terminal to disable users."""
game, agent = game_and_agent_fixture
router = game.simulation.network.get_node_by_hostname("router")
router.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["SSH"], dst_port=PORT_LOOKUP["SSH"], position=4)
server_1 = game.simulation.network.get_node_by_hostname("server_1")
server_1_usm = server_1.software_manager.software["user-manager"]
server_1_usm.add_user("user123", "password", is_admin=True)
action = (
"node-session-remote-login",
{
"node_name": "client_1",
"username": "user123",
"password": "password",
"remote_ip": str(server_1.network_interface[1].ip_address),
},
)
agent.store_action(action)
game.step()
assert agent.history[-1].response.status == "success"
# Disable a user via terminal
action = (
"node-send-remote-command",
{
"node_name": "client_1",
"remote_ip": str(server_1.network_interface[1].ip_address),
"command": ["service", "user-manager", "disable_user", "user123"],
},
)
agent.store_action(action)
game.step()
new_user = server_1.user_manager.users.get("user123")
assert new_user
assert new_user.disabled is True

View File

@@ -44,6 +44,38 @@ def test_file_observation(simulation):
assert observation_state.get("health_status") == 3 # corrupted
def test_config_file_access_categories(simulation):
pc: Computer = simulation.network.get_node_by_hostname("client_1")
file_obs = FileObservation(
where=["network", "nodes", pc.config.hostname, "file_system", "folders", "root", "files", "dog.png"],
include_num_access=False,
file_system_requires_scan=True,
thresholds={"file_access": {"low": 3, "medium": 6, "high": 9}},
)
assert file_obs.high_file_access_threshold == 9
assert file_obs.med_file_access_threshold == 6
assert file_obs.low_file_access_threshold == 3
with pytest.raises(Exception):
# should throw an error
FileObservation(
where=["network", "nodes", pc.config.hostname, "file_system", "folders", "root", "files", "dog.png"],
include_num_access=False,
file_system_requires_scan=True,
thresholds={"file_access": {"low": 9, "medium": 6, "high": 9}},
)
with pytest.raises(Exception):
# should throw an error
FileObservation(
where=["network", "nodes", pc.config.hostname, "file_system", "folders", "root", "files", "dog.png"],
include_num_access=False,
file_system_requires_scan=True,
thresholds={"file_access": {"low": 3, "medium": 9, "high": 9}},
)
def test_folder_observation(simulation):
"""Test the folder observation."""
pc: Computer = simulation.network.get_node_by_hostname("client_1")

View File

@@ -77,6 +77,14 @@ def test_nic(simulation):
nic_obs = NICObservation(where=["network", "nodes", pc.config.hostname, "NICs", 1], include_nmne=True)
# The Simulation object created by the fixture also creates the
# NICObservation class with the NICObservation.capture_nmnme class variable
# set to False. Under normal (non-test) circumstances this class variable
# is set from a config file such as data_manipulation.yaml. So although
# capture_nmne is set to True in the NetworkInterface class it's still False
# in the NICObservation class so we set it now.
nic_obs.capture_nmne = True
# Set the NMNE configuration to capture DELETE/ENCRYPT queries as MNEs
nmne_config = {
"capture_nmne": True, # Enable the capture of MNEs
@@ -115,14 +123,11 @@ def test_nic_categories(simulation):
assert nic_obs.low_nmne_threshold == 0 # default
@pytest.mark.skip(reason="Feature not implemented yet")
def test_config_nic_categories(simulation):
pc: Computer = simulation.network.get_node_by_hostname("client_1")
nic_obs = NICObservation(
where=["network", "nodes", pc.hostname, "NICs", 1],
low_nmne_threshold=3,
med_nmne_threshold=6,
high_nmne_threshold=9,
where=["network", "nodes", pc.config.hostname, "NICs", 1],
thresholds={"nmne": {"low": 3, "medium": 6, "high": 9}},
include_nmne=True,
)
@@ -133,20 +138,16 @@ def test_config_nic_categories(simulation):
with pytest.raises(Exception):
# should throw an error
NICObservation(
where=["network", "nodes", pc.hostname, "NICs", 1],
low_nmne_threshold=9,
med_nmne_threshold=6,
high_nmne_threshold=9,
where=["network", "nodes", pc.config.hostname, "NICs", 1],
thresholds={"nmne": {"low": 9, "medium": 6, "high": 9}},
include_nmne=True,
)
with pytest.raises(Exception):
# should throw an error
NICObservation(
where=["network", "nodes", pc.hostname, "NICs", 1],
low_nmne_threshold=3,
med_nmne_threshold=9,
high_nmne_threshold=9,
where=["network", "nodes", pc.config.hostname, "NICs", 1],
thresholds={"nmne": {"low": 3, "medium": 9, "high": 9}},
include_nmne=True,
)

View File

@@ -39,6 +39,8 @@ def test_host_observation(simulation):
folders=[],
network_interfaces=[],
file_system_requires_scan=True,
services_requires_scan=True,
applications_requires_scan=True,
include_users=False,
)

View File

@@ -0,0 +1,28 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import json
from primaite.session.environment import PrimaiteGymEnv
from primaite.session.io import PrimaiteIO
from tests import TEST_ASSETS_ROOT
DATA_MANIPULATION_CONFIG = TEST_ASSETS_ROOT / "configs" / "data_manipulation.yaml"
def test_obs_data_in_log_file():
"""Create a log file of AgentHistoryItems and check observation data is
included. Assumes that data_manipulation.yaml has an agent labelled
'defender' with a non-null observation space.
The log file will be in:
primaite/VERSION/sessions/YYYY-MM-DD/HH-MM-SS/agent_actions
"""
env = PrimaiteGymEnv(DATA_MANIPULATION_CONFIG)
env.reset()
for _ in range(10):
env.step(0)
env.reset()
io = PrimaiteIO()
path = io.generate_agent_actions_save_path(episode=1)
with open(path, "r") as f:
j = json.load(f)
assert type(j["0"]["defender"]["observation"]) == dict

View File

@@ -29,7 +29,9 @@ def test_service_observation(simulation):
ntp_server = pc.software_manager.software.get("ntp-server")
assert ntp_server
service_obs = ServiceObservation(where=["network", "nodes", pc.config.hostname, "services", "ntp-server"])
service_obs = ServiceObservation(
where=["network", "nodes", pc.config.hostname, "services", "ntp-server"], services_requires_scan=True
)
assert service_obs.space["operating_status"] == spaces.Discrete(7)
assert service_obs.space["health_status"] == spaces.Discrete(5)
@@ -54,7 +56,9 @@ def test_application_observation(simulation):
web_browser: WebBrowser = pc.software_manager.software.get("web-browser")
assert web_browser
app_obs = ApplicationObservation(where=["network", "nodes", pc.config.hostname, "applications", "web-browser"])
app_obs = ApplicationObservation(
where=["network", "nodes", pc.config.hostname, "applications", "web-browser"], applications_requires_scan=True
)
web_browser.close()
observation_state = app_obs.observe(simulation.describe_state())
@@ -69,3 +73,33 @@ def test_application_observation(simulation):
assert observation_state.get("health_status") == 1
assert observation_state.get("operating_status") == 1 # running
assert observation_state.get("num_executions") == 1
def test_application_executions_categories(simulation):
pc: Computer = simulation.network.get_node_by_hostname("client_1")
app_obs = ApplicationObservation(
where=["network", "nodes", pc.config.hostname, "applications", "WebBrowser"],
applications_requires_scan=False,
thresholds={"app_executions": {"low": 3, "medium": 6, "high": 9}},
)
assert app_obs.high_app_execution_threshold == 9
assert app_obs.med_app_execution_threshold == 6
assert app_obs.low_app_execution_threshold == 3
with pytest.raises(Exception):
# should throw an error
ApplicationObservation(
where=["network", "nodes", pc.config.hostname, "applications", "WebBrowser"],
applications_requires_scan=False,
thresholds={"app_executions": {"low": 9, "medium": 6, "high": 9}},
)
with pytest.raises(Exception):
# should throw an error
ApplicationObservation(
where=["network", "nodes", pc.config.hostname, "applications", "WebBrowser"],
applications_requires_scan=False,
thresholds={"app_executions": {"low": 3, "medium": 9, "high": 9}},
)

View File

@@ -7,6 +7,7 @@ import yaml
from primaite.config.load import data_manipulation_config_path
from primaite.game.agent.interface import AgentHistoryItem
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator import SIM_OUTPUT
@pytest.fixture()
@@ -33,6 +34,11 @@ def test_rng_seed_set(create_env):
assert a == b
# Check that seed log file was created.
path = SIM_OUTPUT.path / "seed.log"
with open(path, "r") as file:
assert file
def test_rng_seed_unset(create_env):
"""Test with no RNG seed."""
@@ -48,3 +54,19 @@ def test_rng_seed_unset(create_env):
b = [item.timestep for item in env.game.agents["client_2_green_user"].history if item.action != "do-nothing"]
assert a != b
def test_for_generated_seed():
"""
Show that setting generate_seed_value to true producess a valid seed.
"""
with open(data_manipulation_config_path(), "r") as f:
cfg = yaml.safe_load(f)
cfg["game"]["generate_seed_value"] = True
PrimaiteGymEnv(env_config=cfg)
path = SIM_OUTPUT.path / "seed.log"
with open(path, "r") as file:
data = file.read()
assert data.split(" ")[3] != None

View File

@@ -22,6 +22,7 @@ from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.software import SoftwareHealthState
@@ -107,7 +108,7 @@ def test_router_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, Prox
"""
Test that the RouterACLAddRuleAction can form a request and that it is accepted by the simulation.
The acl starts off with 4 rules, and we add a rule, and check that the acl now has 5 rules.
The ACL starts off with 4 rules, and we add a rule, and check that the ACL now has 5 rules.
"""
game, agent = game_and_agent
@@ -164,11 +165,9 @@ def test_router_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, Prox
},
)
agent.store_action(action)
print(agent.most_recent_action)
game.step()
print(agent.most_recent_action)
# 5: Check that the ACL now has 6 rules, but that server_1 can still ping server_2
print(router.acl.show())
assert router.acl.num_rules == 6
assert server_1.ping("10.0.2.3") # Can ping server_2
@@ -180,7 +179,8 @@ def test_router_acl_removerule_integration(game_and_agent: Tuple[PrimaiteGame, P
# 1: Check that http traffic is going across the network nicely.
client_1 = game.simulation.network.get_node_by_hostname("client_1")
server_1 = game.simulation.network.get_node_by_hostname("server_1")
router = game.simulation.network.get_node_by_hostname("router")
router: Router = game.simulation.network.get_node_by_hostname("router")
assert router.acl.num_rules == 4
browser: WebBrowser = client_1.software_manager.software.get("web-browser")
browser.run()

View File

@@ -1,5 +1,11 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from itertools import product
import yaml
from primaite.config.load import data_manipulation_config_path
from primaite.game.agent.observations.nic_observations import NICObservation
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
from primaite.simulator.network.hardware.nodes.host.server import Server
@@ -277,3 +283,19 @@ def test_capture_nmne_observations(uc2_network: Network):
assert web_nic_obs["outbound"] == expected_nmne
assert db_nic_obs["inbound"] == expected_nmne
uc2_network.apply_timestep(timestep=0)
def test_nmne_parameter_settings():
"""
Check that the four permutations of the values of capture_nmne and
include_nmne work as expected.
"""
with open(data_manipulation_config_path(), "r") as f:
cfg = yaml.safe_load(f)
DEFENDER = 3
for capture, include in product([True, False], [True, False]):
cfg["simulation"]["network"]["nmne_config"]["capture_nmne"] = capture
cfg["agents"][DEFENDER]["observation_space"]["options"]["components"][0]["options"]["include_nmne"] = include
PrimaiteGymEnv(env_config=cfg)

View File

@@ -1,6 +1,7 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from primaite.simulator.network.hardware.nodes.network.router import RouterARP
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterARP
from primaite.simulator.system.services.arp.arp import ARP
from primaite.utils.validation.port import PORT_LOOKUP
from tests.integration_tests.network.test_routing import multi_hop_network
@@ -48,3 +49,19 @@ def test_arp_fails_for_network_address_between_routers(multi_hop_network):
actual_result = router_1_arp.get_arp_cache_mac_address(router_1.network_interface[1].ip_network.network_address)
assert actual_result == expected_result
def test_arp_not_affected_by_acl(multi_hop_network):
pc_a = multi_hop_network.get_node_by_hostname("pc_a")
router_1: Router = multi_hop_network.get_node_by_hostname("router_1")
# Add explicit rule to block ARP traffic. This shouldn't actually stop ARP traffic
# as it operates a different layer within the network.
router_1.acl.add_rule(action=ACLAction.DENY, src_port=PORT_LOOKUP["ARP"], dst_port=PORT_LOOKUP["ARP"], position=23)
pc_a_arp: ARP = pc_a.software_manager.arp
expected_result = router_1.network_interface[2].mac_address
actual_result = pc_a_arp.get_arp_cache_mac_address(router_1.network_interface[2].ip_address)
assert actual_result == expected_result

View File

@@ -1,10 +1,11 @@
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
import json
from typing import List
import pytest
import yaml
from primaite.game.agent.observations import ObservationManager
from primaite.game.agent.observations import ApplicationObservation, ObservationManager, ServiceObservation
from primaite.game.agent.observations.file_system_observations import FileObservation, FolderObservation
from primaite.game.agent.observations.host_observations import HostObservation
@@ -136,3 +137,227 @@ class TestFileSystemRequiresScan:
[], files=[], num_files=0, include_num_access=False, file_system_requires_scan=True
)
assert obs_requiring_scan.observe(folder_state)["health_status"] == 1
class TestServicesRequiresScan:
@pytest.mark.parametrize(
("yaml_option_string", "expected_val"),
(
("services_requires_scan: true", True),
("services_requires_scan: false", False),
(" ", True),
),
)
def test_obs_config(self, yaml_option_string, expected_val):
"""Check that the default behaviour is to set service_requires_scan to True."""
obs_cfg_yaml = f"""
type: custom
options:
components:
- type: nodes
label: NODES
options:
hosts:
- hostname: domain_controller
- hostname: web_server
services:
- service_name: web-server
- service_name: dns-client
- hostname: database_server
folders:
- folder_name: database
files:
- file_name: database.db
- hostname: backup_server
services:
- service_name: ftp-server
- hostname: security_suite
- hostname: client_1
- hostname: client_2
num_services: 3
num_applications: 0
num_folders: 1
num_files: 1
num_nics: 2
include_num_access: false
{yaml_option_string}
include_nmne: true
monitored_traffic:
icmp:
- NONE
tcp:
- DNS
routers:
- hostname: router_1
num_ports: 0
ip_list:
- 192.168.1.10
- 192.168.1.12
- 192.168.1.14
- 192.168.1.16
- 192.168.1.110
- 192.168.10.21
- 192.168.10.22
- 192.168.10.110
wildcard_list:
- 0.0.0.1
port_list:
- 80
- 5432
protocol_list:
- ICMP
- TCP
- UDP
num_rules: 10
- type: links
label: LINKS
options:
link_references:
- router_1:eth-1<->switch_1:eth-8
- router_1:eth-2<->switch_2:eth-8
- switch_1:eth-1<->domain_controller:eth-1
- switch_1:eth-2<->web_server:eth-1
- switch_1:eth-3<->database_server:eth-1
- switch_1:eth-4<->backup_server:eth-1
- switch_1:eth-7<->security_suite:eth-1
- switch_2:eth-1<->client_1:eth-1
- switch_2:eth-2<->client_2:eth-1
- switch_2:eth-7<->security_suite:eth-2
- type: none
label: ICS
options: {{}}
"""
cfg = yaml.safe_load(obs_cfg_yaml)
manager = ObservationManager.from_config(cfg)
hosts: List[HostObservation] = manager.obs.components["NODES"].hosts
for i, host in enumerate(hosts):
services: List[ServiceObservation] = host.services
for j, service in enumerate(services):
val = service.services_requires_scan
print(f"host {i} service {j} {val}")
assert val == expected_val # Make sure services require scan by default
def test_services_requires_scan(self):
state = {"health_state_actual": 3, "health_state_visible": 1, "operating_state": 1}
obs_requiring_scan = ServiceObservation([], services_requires_scan=True)
assert obs_requiring_scan.observe(state)["health_status"] == 1 # should be visible value
obs_not_requiring_scan = ServiceObservation([], services_requires_scan=False)
assert obs_not_requiring_scan.observe(state)["health_status"] == 3 # should be actual value
class TestApplicationsRequiresScan:
@pytest.mark.parametrize(
("yaml_option_string", "expected_val"),
(
("applications_requires_scan: true", True),
("applications_requires_scan: false", False),
(" ", True),
),
)
def test_obs_config(self, yaml_option_string, expected_val):
"""Check that the default behaviour is to set applications_requires_scan to True."""
obs_cfg_yaml = f"""
type: custom
options:
components:
- type: nodes
label: NODES
options:
hosts:
- hostname: domain_controller
- hostname: web_server
- hostname: database_server
folders:
- folder_name: database
files:
- file_name: database.db
- hostname: backup_server
- hostname: security_suite
- hostname: client_1
applications:
- application_name: web-browser
- hostname: client_2
applications:
- application_name: web-browser
- application_name: database-client
num_services: 0
num_applications: 3
num_folders: 1
num_files: 1
num_nics: 2
include_num_access: false
{yaml_option_string}
include_nmne: true
monitored_traffic:
icmp:
- NONE
tcp:
- DNS
routers:
- hostname: router_1
num_ports: 0
ip_list:
- 192.168.1.10
- 192.168.1.12
- 192.168.1.14
- 192.168.1.16
- 192.168.1.110
- 192.168.10.21
- 192.168.10.22
- 192.168.10.110
wildcard_list:
- 0.0.0.1
port_list:
- 80
- 5432
protocol_list:
- ICMP
- TCP
- UDP
num_rules: 10
- type: links
label: LINKS
options:
link_references:
- router_1:eth-1<->switch_1:eth-8
- router_1:eth-2<->switch_2:eth-8
- switch_1:eth-1<->domain_controller:eth-1
- switch_1:eth-2<->web_server:eth-1
- switch_1:eth-3<->database_server:eth-1
- switch_1:eth-4<->backup_server:eth-1
- switch_1:eth-7<->security_suite:eth-1
- switch_2:eth-1<->client_1:eth-1
- switch_2:eth-2<->client_2:eth-1
- switch_2:eth-7<->security_suite:eth-2
- type: none
label: ICS
options: {{}}
"""
cfg = yaml.safe_load(obs_cfg_yaml)
manager = ObservationManager.from_config(cfg)
hosts: List[HostObservation] = manager.obs.components["NODES"].hosts
for i, host in enumerate(hosts):
services: List[ServiceObservation] = host.services
for j, service in enumerate(services):
val = service.services_requires_scan
print(f"host {i} service {j} {val}")
assert val == expected_val # Make sure applications require scan by default
def test_applications_requires_scan(self):
state = {"health_state_actual": 3, "health_state_visible": 1, "operating_state": 1, "num_executions": 1}
obs_requiring_scan = ApplicationObservation([], applications_requires_scan=True)
assert obs_requiring_scan.observe(state)["health_status"] == 1 # should be visible value
obs_not_requiring_scan = ApplicationObservation([], applications_requires_scan=False)
assert obs_not_requiring_scan.observe(state)["health_status"] == 3 # should be actual value

View File

@@ -73,7 +73,7 @@ def test_ftp_should_not_process_commands_if_service_not_running(ftp_client):
assert ftp_client_service._process_ftp_command(payload=payload).status_code is FTPStatusCode.ERROR
def test_ftp_tries_to_senf_file__that_does_not_exist(ftp_client):
def test_ftp_tries_to_send_file__that_does_not_exist(ftp_client):
"""Method send_file should return false if no file to send."""
assert ftp_client.file_system.get_file(folder_name="root", file_name="test.txt") is None

View File

@@ -6,6 +6,7 @@ import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.interface.request import RequestResponse
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
@@ -442,3 +443,59 @@ def test_terminal_connection_timeout(basic_network):
assert len(computer_b.user_session_manager.remote_sessions) == 0
assert not remote_connection.is_active
def test_terminal_last_response_updates(basic_network):
"""Test that the _last_response within Terminal correctly updates."""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
terminal_a: Terminal = computer_a.software_manager.software.get("terminal")
computer_b: Computer = network.get_node_by_hostname("node_b")
assert terminal_a.last_response is None
remote_connection = terminal_a.login(username="admin", password="admin", ip_address="192.168.0.11")
# Last response should be a successful logon
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "Login Successful"})
remote_connection.execute(command=["software_manager", "application", "install", "ransomware-script"])
# Last response should now update following successful install
assert terminal_a.last_response == RequestResponse(status="success", data={})
remote_connection.execute(command=["software_manager", "application", "install", "ransomware-script"])
# Last response should now update to success, but with supplied reason.
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "already installed"})
remote_connection.execute(command=["file_system", "create", "file", "folder123", "doggo.pdf", False])
# Check file was created.
assert computer_b.file_system.access_file(folder_name="folder123", file_name="doggo.pdf")
# Last response should be confirmation of file creation.
assert terminal_a.last_response == RequestResponse(
status="success",
data={"file_name": "doggo.pdf", "folder_name": "folder123", "file_type": "PDF", "file_size": 102400},
)
remote_connection.execute(
command=[
"service",
"ftp-client",
"send",
{
"dest_ip_address": "192.168.0.2",
"src_folder": "folder123",
"src_file_name": "cat.pdf",
"dest_folder": "root",
"dest_file_name": "cat.pdf",
},
]
)
assert terminal_a.last_response == RequestResponse(
status="failure",
data={"reason": "Unable to locate given file on local file system. Perhaps given options are invalid?"},
)