From 068ad2f1fa17cac80856bd71fa4d00f5b673b89c Mon Sep 17 00:00:00 2001 From: Charlie Crane Date: Mon, 9 Dec 2024 13:56:40 +0000 Subject: [PATCH] #2912 - Updates to get tests to pass. Some ACL rules still misbehaving --- src/primaite/game/agent/actions/acl.py | 35 +++++------ .../configs/firewall_actions_network.yaml | 17 +++--- .../game_layer/test_actions.py | 58 +++++++++++++++---- 3 files changed, 75 insertions(+), 35 deletions(-) diff --git a/src/primaite/game/agent/actions/acl.py b/src/primaite/game/agent/actions/acl.py index 5cd7a355..37dde757 100644 --- a/src/primaite/game/agent/actions/acl.py +++ b/src/primaite/game/agent/actions/acl.py @@ -1,14 +1,16 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK from __future__ import annotations +from ipaddress import IPv4Address from typing import List from pydantic import field_validator from primaite.game.agent.actions.manager import AbstractAction from primaite.interface.request import RequestFormat -from primaite.utils.validation.ip_protocol import is_valid_protocol, protocol_validator -from primaite.utils.validation.port import is_valid_port +from primaite.utils.validation.ip_protocol import protocol_validator +from primaite.utils.validation.ipv4_address import ipv4_validator +from primaite.utils.validation.port import port_validator __all__ = ( "RouterACLAddRuleAction", @@ -26,39 +28,38 @@ class ACLAddRuleAbstractAction(AbstractAction, identifier="acl_add_rule_abstract class ConfigSchema(AbstractAction.ConfigSchema): """Configuration Schema base for ACL add rule abstract actions.""" - src_ip: str + src_ip: IPv4Address protocol_name: str permission: str position: int - src_ip: str - dst_ip: str - src_port: str - dst_port: str + dst_ip: IPv4Address + src_port: int + dst_port: int src_wildcard: int dst_wildcard: int @field_validator( - src_port, - dst_port, + "src_port", + "dst_port", mode="before", ) @classmethod def valid_port(cls, v: str) -> int: """Check that inputs are valid.""" - return is_valid_port(v) + return port_validator(v) @field_validator( - src_ip, - dst_ip, + "src_ip", + "dst_ip", mode="before", ) @classmethod def valid_ip(cls, v: str) -> str: """Check that a valid IP has been provided for src and dst.""" - return is_valid_protocol(v) + return ipv4_validator(v) @field_validator( - protocol_name, + "protocol_name", mode="before", ) @classmethod @@ -80,16 +81,16 @@ class ACLRemoveRuleAbstractAction(AbstractAction, identifier="acl_remove_rule_ab position: int @field_validator( - src_ip, + "src_ip", mode="before", ) @classmethod def valid_ip(cls, v: str) -> str: """Check that a valid IP has been provided for src and dst.""" - return is_valid_protocol(v) + return ipv4_validator(v) @field_validator( - protocol_name, + "protocol_name", mode="before", ) @classmethod diff --git a/tests/assets/configs/firewall_actions_network.yaml b/tests/assets/configs/firewall_actions_network.yaml index 4c3b5000..29af3b55 100644 --- a/tests/assets/configs/firewall_actions_network.yaml +++ b/tests/assets/configs/firewall_actions_network.yaml @@ -108,18 +108,19 @@ agents: 1: action: firewall_acl_add_rule options: + type: firewall_acl_add_rule target_firewall_nodename: firewall firewall_port_name: internal firewall_port_direction: inbound position: 1 permission: PERMIT src_ip: 192.168.0.10 - dest_ip: ALL - src_port: ALL - dst_port: ALL - protocol_name: ALL - source_wildcard_id: 0 - dest_wildcard_id: 0 + dst_ip: 0.0.0.0 + src_port: 80 + dst_port: HTTP + protocol_name: TCP + src_wildcard: 0 + dst_wildcard: 0 2: action: firewall_acl_remove_rule options: @@ -136,7 +137,7 @@ agents: position: 1 permission: DENY src_ip: 192.168.0.10 # client 1 - dest_ip: ALL # ALL + dest_ip: ALL src_port: ARP dst_port: DNS protocol_name: ICMP @@ -240,11 +241,13 @@ agents: 13: action: network_port_disable options: + type: network_port_disable target_nodename: firewall port_id: 3 14: action: network_port_enable options: + type: network_port_enable target_nodename: firewall port_id: 3 options: diff --git a/tests/integration_tests/game_layer/test_actions.py b/tests/integration_tests/game_layer/test_actions.py index a21ad34f..9fdf029b 100644 --- a/tests/integration_tests/game_layer/test_actions.py +++ b/tests/integration_tests/game_layer/test_actions.py @@ -88,7 +88,7 @@ def test_node_service_fix_integration(game_and_agent: Tuple[PrimaiteGame, ProxyA svc.health_state_actual = SoftwareHealthState.COMPROMISED # 2: Apply a patch action - action = ("node_service_fix", {"node_name": "server_1", "service_name": "DNSServer"}) + action = ("node_service_fix", {"type": "node_service_fix", "node_name": "server_1", "service_name": "DNSServer"}) agent.store_action(action) game.step() @@ -123,16 +123,17 @@ def test_router_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, Prox action = ( "router_acl_add_rule", { + "type": "router_acl_add_rule", "target_router": "router", "position": 4, "permission": "DENY", "src_ip": "10.0.1.2", "src_wildcard": 0, - "source_port": "ALL", + "src_port": "HTTP", "dst_ip": "10.0.2.3", "dst_wildcard": 0, - "dst_port": "ALL", - "protocol_name": "ALL", + "dst_port": "HTTP", + "protocol_name": "udp", }, ) agent.store_action(action) @@ -150,6 +151,7 @@ def test_router_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, Prox action = ( "router_acl_add_rule", { + "type": "router_acl_add_rule", "target_router": "router", "position": 5, # 5th rule "permission": "DENY", # DENY @@ -190,6 +192,7 @@ def test_router_acl_removerule_integration(game_and_agent: Tuple[PrimaiteGame, P action = ( "router_acl_remove_rule", { + "type": "router_acl_remove_rule", "target_router": "router", "position": 3, # 4th rule }, @@ -223,6 +226,7 @@ def test_host_nic_disable_integration(game_and_agent: Tuple[PrimaiteGame, ProxyA action = ( "host_nic_disable", { + "type": "host_nic_disable", "node_name": "client_1", # client_1 "nic_num": 1, # the only nic (eth-1) }, @@ -254,6 +258,7 @@ def test_host_nic_enable_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAg action = ( "host_nic_enable", { + "type": "host_nic_enable", "node_name": "client_1", # client_1 "nic_num": 1, # the only nic (eth-1) }, @@ -281,6 +286,7 @@ def test_node_file_scan_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAge action = ( "node_file_scan", { + "type": "node_file_scan", "node_name": "client_1", # client_1, "folder_name": "downloads", # downloads, "file_name": "cat.png", # cat.png @@ -318,6 +324,7 @@ def test_node_file_delete_integration(game_and_agent: Tuple[PrimaiteGame, ProxyA action = ( "node_file_delete", { + "type": "node_file_delete", "node_name": "client_1", # client_1 "folder_name": "downloads", # downloads "file_name": "cat.png", # cat.png @@ -340,7 +347,13 @@ def test_node_file_create(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): action = ( "node_file_create", - {"node_name": "client_1", "folder_name": "test", "file_name": "file.txt", "force": "False"}, + { + "type": "node_file_create", + "node_name": "client_1", + "folder_name": "test", + "file_name": "file.txt", + "force": "False", + }, ) agent.store_action(action) game.step() @@ -357,6 +370,7 @@ def test_node_file_access(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): action = ( "node_file_create", { + "type": "node_file_create", "node_name": "client_1", "folder_name": "test", "file_name": "file.txt", @@ -370,6 +384,7 @@ def test_node_file_access(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): action = ( "node_file_access", { + "type": "node_file_access", "node_name": "client_1", "folder_name": "test", "file_name": "file.txt", @@ -390,6 +405,7 @@ def test_node_folder_create(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): action = ( "node_folder_create", { + "type": "node_folder_create", "node_name": "client_1", "folder_name": "test", }, @@ -418,6 +434,7 @@ def test_network_router_port_disable_integration(game_and_agent: Tuple[PrimaiteG action = ( "network_port_disable", { + "type": "network_port_disable", "target_nodename": "router", # router "port_id": 1, # port 1 }, @@ -450,6 +467,7 @@ def test_network_router_port_enable_integration(game_and_agent: Tuple[PrimaiteGa action = ( "network_port_enable", { + "type": "network_port_enable", "target_nodename": "router", # router "port_id": 1, # port 1 }, @@ -478,7 +496,10 @@ def test_node_application_scan_integration(game_and_agent: Tuple[PrimaiteGame, P assert browser.health_state_visible == SoftwareHealthState.UNUSED # 2: Scan and check that the visible state is now correct - action = ("node_application_scan", {"node_name": "client_1", "application_name": "WebBrowser"}) + action = ( + "node_application_scan", + {"type": "node_application_scan", "node_name": "client_1", "application_name": "WebBrowser"}, + ) agent.store_action(action) game.step() assert browser.health_state_actual == SoftwareHealthState.GOOD @@ -489,7 +510,10 @@ def test_node_application_scan_integration(game_and_agent: Tuple[PrimaiteGame, P assert browser.health_state_visible == SoftwareHealthState.GOOD # 4: Scan and check that the visible state is now correct - action = ("node_application_scan", {"node_name": "client_1", "application_name": "WebBrowser"}) + action = ( + "node_application_scan", + {"type": "node_application_scan", "node_name": "client_1", "application_name": "WebBrowser"}, + ) agent.store_action(action) game.step() assert browser.health_state_actual == SoftwareHealthState.COMPROMISED @@ -510,7 +534,10 @@ def test_node_application_fix_integration(game_and_agent: Tuple[PrimaiteGame, Pr browser.health_state_actual = SoftwareHealthState.COMPROMISED # 2: Apply a fix action - action = ("node_application_fix", {"node_name": "client_1", "application_name": "WebBrowser"}) + action = ( + "node_application_fix", + {"type": "node_application_fix", "node_name": "client_1", "application_name": "WebBrowser"}, + ) agent.store_action(action) game.step() @@ -536,7 +563,10 @@ def test_node_application_close_integration(game_and_agent: Tuple[PrimaiteGame, assert browser.operating_state == ApplicationOperatingState.RUNNING # 2: Apply a close action - action = ("node_application_close", {"node_name": "client_1", "application_name": "WebBrowser"}) + action = ( + "node_application_close", + {"type": "node_application_close", "node_name": "client_1", "application_name": "WebBrowser"}, + ) agent.store_action(action) game.step() @@ -555,13 +585,19 @@ def test_node_application_install_and_uninstall_integration(game_and_agent: Tupl assert client_1.software_manager.software.get("DoSBot") is None - action = ("node_application_install", {"node_name": "client_1", "application_name": "DoSBot"}) + action = ( + "node_application_install", + {"type": "node_application_install", "node_name": "client_1", "application_name": "DoSBot"}, + ) agent.store_action(action) game.step() assert client_1.software_manager.software.get("DoSBot") is not None - action = ("node_application_remove", {"node_name": "client_1", "application_name": "DoSBot"}) + action = ( + "node_application_remove", + {"type": "node_application_remove", "node_name": "client_1", "application_name": "DoSBot"}, + ) agent.store_action(action) game.step()