#2912 - Updates to get tests to pass. Some ACL rules still misbehaving

This commit is contained in:
Charlie Crane
2024-12-09 13:56:40 +00:00
parent 386717fa41
commit 068ad2f1fa
3 changed files with 75 additions and 35 deletions

View File

@@ -1,14 +1,16 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
from ipaddress import IPv4Address
from typing import List
from pydantic import field_validator
from primaite.game.agent.actions.manager import AbstractAction
from primaite.interface.request import RequestFormat
from primaite.utils.validation.ip_protocol import is_valid_protocol, protocol_validator
from primaite.utils.validation.port import is_valid_port
from primaite.utils.validation.ip_protocol import protocol_validator
from primaite.utils.validation.ipv4_address import ipv4_validator
from primaite.utils.validation.port import port_validator
__all__ = (
"RouterACLAddRuleAction",
@@ -26,39 +28,38 @@ class ACLAddRuleAbstractAction(AbstractAction, identifier="acl_add_rule_abstract
class ConfigSchema(AbstractAction.ConfigSchema):
"""Configuration Schema base for ACL add rule abstract actions."""
src_ip: str
src_ip: IPv4Address
protocol_name: str
permission: str
position: int
src_ip: str
dst_ip: str
src_port: str
dst_port: str
dst_ip: IPv4Address
src_port: int
dst_port: int
src_wildcard: int
dst_wildcard: int
@field_validator(
src_port,
dst_port,
"src_port",
"dst_port",
mode="before",
)
@classmethod
def valid_port(cls, v: str) -> int:
"""Check that inputs are valid."""
return is_valid_port(v)
return port_validator(v)
@field_validator(
src_ip,
dst_ip,
"src_ip",
"dst_ip",
mode="before",
)
@classmethod
def valid_ip(cls, v: str) -> str:
"""Check that a valid IP has been provided for src and dst."""
return is_valid_protocol(v)
return ipv4_validator(v)
@field_validator(
protocol_name,
"protocol_name",
mode="before",
)
@classmethod
@@ -80,16 +81,16 @@ class ACLRemoveRuleAbstractAction(AbstractAction, identifier="acl_remove_rule_ab
position: int
@field_validator(
src_ip,
"src_ip",
mode="before",
)
@classmethod
def valid_ip(cls, v: str) -> str:
"""Check that a valid IP has been provided for src and dst."""
return is_valid_protocol(v)
return ipv4_validator(v)
@field_validator(
protocol_name,
"protocol_name",
mode="before",
)
@classmethod

View File

@@ -108,18 +108,19 @@ agents:
1:
action: firewall_acl_add_rule
options:
type: firewall_acl_add_rule
target_firewall_nodename: firewall
firewall_port_name: internal
firewall_port_direction: inbound
position: 1
permission: PERMIT
src_ip: 192.168.0.10
dest_ip: ALL
src_port: ALL
dst_port: ALL
protocol_name: ALL
source_wildcard_id: 0
dest_wildcard_id: 0
dst_ip: 0.0.0.0
src_port: 80
dst_port: HTTP
protocol_name: TCP
src_wildcard: 0
dst_wildcard: 0
2:
action: firewall_acl_remove_rule
options:
@@ -136,7 +137,7 @@ agents:
position: 1
permission: DENY
src_ip: 192.168.0.10 # client 1
dest_ip: ALL # ALL
dest_ip: ALL
src_port: ARP
dst_port: DNS
protocol_name: ICMP
@@ -240,11 +241,13 @@ agents:
13:
action: network_port_disable
options:
type: network_port_disable
target_nodename: firewall
port_id: 3
14:
action: network_port_enable
options:
type: network_port_enable
target_nodename: firewall
port_id: 3
options:

View File

@@ -88,7 +88,7 @@ def test_node_service_fix_integration(game_and_agent: Tuple[PrimaiteGame, ProxyA
svc.health_state_actual = SoftwareHealthState.COMPROMISED
# 2: Apply a patch action
action = ("node_service_fix", {"node_name": "server_1", "service_name": "DNSServer"})
action = ("node_service_fix", {"type": "node_service_fix", "node_name": "server_1", "service_name": "DNSServer"})
agent.store_action(action)
game.step()
@@ -123,16 +123,17 @@ def test_router_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, Prox
action = (
"router_acl_add_rule",
{
"type": "router_acl_add_rule",
"target_router": "router",
"position": 4,
"permission": "DENY",
"src_ip": "10.0.1.2",
"src_wildcard": 0,
"source_port": "ALL",
"src_port": "HTTP",
"dst_ip": "10.0.2.3",
"dst_wildcard": 0,
"dst_port": "ALL",
"protocol_name": "ALL",
"dst_port": "HTTP",
"protocol_name": "udp",
},
)
agent.store_action(action)
@@ -150,6 +151,7 @@ def test_router_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, Prox
action = (
"router_acl_add_rule",
{
"type": "router_acl_add_rule",
"target_router": "router",
"position": 5, # 5th rule
"permission": "DENY", # DENY
@@ -190,6 +192,7 @@ def test_router_acl_removerule_integration(game_and_agent: Tuple[PrimaiteGame, P
action = (
"router_acl_remove_rule",
{
"type": "router_acl_remove_rule",
"target_router": "router",
"position": 3, # 4th rule
},
@@ -223,6 +226,7 @@ def test_host_nic_disable_integration(game_and_agent: Tuple[PrimaiteGame, ProxyA
action = (
"host_nic_disable",
{
"type": "host_nic_disable",
"node_name": "client_1", # client_1
"nic_num": 1, # the only nic (eth-1)
},
@@ -254,6 +258,7 @@ def test_host_nic_enable_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAg
action = (
"host_nic_enable",
{
"type": "host_nic_enable",
"node_name": "client_1", # client_1
"nic_num": 1, # the only nic (eth-1)
},
@@ -281,6 +286,7 @@ def test_node_file_scan_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAge
action = (
"node_file_scan",
{
"type": "node_file_scan",
"node_name": "client_1", # client_1,
"folder_name": "downloads", # downloads,
"file_name": "cat.png", # cat.png
@@ -318,6 +324,7 @@ def test_node_file_delete_integration(game_and_agent: Tuple[PrimaiteGame, ProxyA
action = (
"node_file_delete",
{
"type": "node_file_delete",
"node_name": "client_1", # client_1
"folder_name": "downloads", # downloads
"file_name": "cat.png", # cat.png
@@ -340,7 +347,13 @@ def test_node_file_create(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
action = (
"node_file_create",
{"node_name": "client_1", "folder_name": "test", "file_name": "file.txt", "force": "False"},
{
"type": "node_file_create",
"node_name": "client_1",
"folder_name": "test",
"file_name": "file.txt",
"force": "False",
},
)
agent.store_action(action)
game.step()
@@ -357,6 +370,7 @@ def test_node_file_access(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
action = (
"node_file_create",
{
"type": "node_file_create",
"node_name": "client_1",
"folder_name": "test",
"file_name": "file.txt",
@@ -370,6 +384,7 @@ def test_node_file_access(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
action = (
"node_file_access",
{
"type": "node_file_access",
"node_name": "client_1",
"folder_name": "test",
"file_name": "file.txt",
@@ -390,6 +405,7 @@ def test_node_folder_create(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
action = (
"node_folder_create",
{
"type": "node_folder_create",
"node_name": "client_1",
"folder_name": "test",
},
@@ -418,6 +434,7 @@ def test_network_router_port_disable_integration(game_and_agent: Tuple[PrimaiteG
action = (
"network_port_disable",
{
"type": "network_port_disable",
"target_nodename": "router", # router
"port_id": 1, # port 1
},
@@ -450,6 +467,7 @@ def test_network_router_port_enable_integration(game_and_agent: Tuple[PrimaiteGa
action = (
"network_port_enable",
{
"type": "network_port_enable",
"target_nodename": "router", # router
"port_id": 1, # port 1
},
@@ -478,7 +496,10 @@ def test_node_application_scan_integration(game_and_agent: Tuple[PrimaiteGame, P
assert browser.health_state_visible == SoftwareHealthState.UNUSED
# 2: Scan and check that the visible state is now correct
action = ("node_application_scan", {"node_name": "client_1", "application_name": "WebBrowser"})
action = (
"node_application_scan",
{"type": "node_application_scan", "node_name": "client_1", "application_name": "WebBrowser"},
)
agent.store_action(action)
game.step()
assert browser.health_state_actual == SoftwareHealthState.GOOD
@@ -489,7 +510,10 @@ def test_node_application_scan_integration(game_and_agent: Tuple[PrimaiteGame, P
assert browser.health_state_visible == SoftwareHealthState.GOOD
# 4: Scan and check that the visible state is now correct
action = ("node_application_scan", {"node_name": "client_1", "application_name": "WebBrowser"})
action = (
"node_application_scan",
{"type": "node_application_scan", "node_name": "client_1", "application_name": "WebBrowser"},
)
agent.store_action(action)
game.step()
assert browser.health_state_actual == SoftwareHealthState.COMPROMISED
@@ -510,7 +534,10 @@ def test_node_application_fix_integration(game_and_agent: Tuple[PrimaiteGame, Pr
browser.health_state_actual = SoftwareHealthState.COMPROMISED
# 2: Apply a fix action
action = ("node_application_fix", {"node_name": "client_1", "application_name": "WebBrowser"})
action = (
"node_application_fix",
{"type": "node_application_fix", "node_name": "client_1", "application_name": "WebBrowser"},
)
agent.store_action(action)
game.step()
@@ -536,7 +563,10 @@ def test_node_application_close_integration(game_and_agent: Tuple[PrimaiteGame,
assert browser.operating_state == ApplicationOperatingState.RUNNING
# 2: Apply a close action
action = ("node_application_close", {"node_name": "client_1", "application_name": "WebBrowser"})
action = (
"node_application_close",
{"type": "node_application_close", "node_name": "client_1", "application_name": "WebBrowser"},
)
agent.store_action(action)
game.step()
@@ -555,13 +585,19 @@ def test_node_application_install_and_uninstall_integration(game_and_agent: Tupl
assert client_1.software_manager.software.get("DoSBot") is None
action = ("node_application_install", {"node_name": "client_1", "application_name": "DoSBot"})
action = (
"node_application_install",
{"type": "node_application_install", "node_name": "client_1", "application_name": "DoSBot"},
)
agent.store_action(action)
game.step()
assert client_1.software_manager.software.get("DoSBot") is not None
action = ("node_application_remove", {"node_name": "client_1", "application_name": "DoSBot"})
action = (
"node_application_remove",
{"type": "node_application_remove", "node_name": "client_1", "application_name": "DoSBot"},
)
agent.store_action(action)
game.step()