From 534f84ccd1382cadfdbc3c7017a3e2b0c6597e02 Mon Sep 17 00:00:00 2001 From: Marek Wolan Date: Mon, 8 Jan 2024 13:29:17 +0000 Subject: [PATCH] Add action tests. --- .../game_layer/test_actions.py | 133 +++++++++++++++--- .../_primaite/_game/_agent/test_actions.py | 4 +- 2 files changed, 118 insertions(+), 19 deletions(-) diff --git a/tests/integration_tests/game_layer/test_actions.py b/tests/integration_tests/game_layer/test_actions.py index 85660796..b756553e 100644 --- a/tests/integration_tests/game_layer/test_actions.py +++ b/tests/integration_tests/game_layer/test_actions.py @@ -69,17 +69,17 @@ def install_stuff_to_sim(sim: Simulation): # 1: Set up network hardware # 1.1: Configure the router - router = Router(hostname="router", num_ports=3) + router = Router(hostname="router", num_ports=3, operating_state=NodeOperatingState.ON) router.power_on() router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0") router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0") # 1.2: Create and connect switches - switch_1 = Switch(hostname="switch_1", num_ports=6) + switch_1 = Switch(hostname="switch_1", num_ports=6, operating_state=NodeOperatingState.ON) switch_1.power_on() network.connect(endpoint_a=router.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6]) router.enable_port(1) - switch_2 = Switch(hostname="switch_2", num_ports=6) + switch_2 = Switch(hostname="switch_2", num_ports=6, operating_state=NodeOperatingState.ON) switch_2.power_on() network.connect(endpoint_a=router.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6]) router.enable_port(2) @@ -191,6 +191,7 @@ def game_and_agent(): {"type": "NODE_SERVICE_RESTART"}, {"type": "NODE_SERVICE_DISABLE"}, {"type": "NODE_SERVICE_ENABLE"}, + {"type": "NODE_SERVICE_PATCH"}, {"type": "NODE_APPLICATION_EXECUTE"}, {"type": "NODE_FILE_SCAN"}, {"type": "NODE_FILE_CHECKHASH"}, @@ -259,31 +260,129 @@ def test_do_nothing_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]) game.step() -@pytest.mark.skip(reason="Waiting to merge ticket 2160") +# @pytest.mark.skip(reason="Waiting to merge ticket 2166") def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): """ Test that the NodeServiceScanAction can form a request and that it is accepted by the simulation. The health status of applications is not always updated in the state dict, rather the agent needs to perform a scan. - Therefore, we set the web browser to be corrupted, check the state is still good, then perform a scan, and check + Therefore, we set a service to be compromised, check the state is still good, then perform a scan, and check that the state changes to the true value. """ game, agent = game_and_agent - browser = game.simulation.network.get_node_by_hostname("client_1").software_manager.software.get("WebBrowser") - browser.health_state_actual = SoftwareHealthState.COMPROMISED + # 1: Check that the service starts off in a good state, and that visible state is hidden until first scan + svc = game.simulation.network.get_node_by_hostname("client_1").software_manager.software.get("DNSClient") + assert svc.health_state_actual == SoftwareHealthState.GOOD + assert svc.health_state_visible == SoftwareHealthState.UNUSED - state_before = game.get_sim_state() - assert ( - game.get_sim_state()["network"]["nodes"]["client_1"]["applications"]["WebBrowser"]["health_state"] - == SoftwareHealthState.GOOD - ) + # 2: Scan and check that the visible state is now correct action = ("NODE_SERVICE_SCAN", {"node_id": 0, "service_id": 0}) agent.store_action(action) game.step() - state_after = game.get_sim_state() - pass - assert ( - game.get_sim_state()["network"]["nodes"]["client_1"]["services"]["WebBrowser"]["health_state"] - == SoftwareHealthState.COMPROMISED + assert svc.health_state_actual == SoftwareHealthState.GOOD + assert svc.health_state_visible == SoftwareHealthState.GOOD + + # 3: Corrupt the service and check that the visible state is still good + svc.health_state_actual = SoftwareHealthState.COMPROMISED + assert svc.health_state_visible == SoftwareHealthState.GOOD + + # 4: Scan and check that the visible state is now correct + action = ("NODE_SERVICE_SCAN", {"node_id": 0, "service_id": 0}) + agent.store_action(action) + game.step() + assert svc.health_state_actual == SoftwareHealthState.COMPROMISED + assert svc.health_state_visible == SoftwareHealthState.COMPROMISED + + +def test_node_service_patch_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): + """ + Test that the NodeServicePatchAction can form a request and that it is accepted by the simulation. + + When you initiate a patch action, the software health state turns to PATCHING, then after a few steps, it goes + to GOOD. + """ + game, agent = game_and_agent + + # 1: Corrupt the service + svc = game.simulation.network.get_node_by_hostname("server_1").software_manager.software.get("DNSServer") + svc.health_state_actual = SoftwareHealthState.COMPROMISED + + # 2: Apply a patch action + action = ("NODE_SERVICE_PATCH", {"node_id": 1, "service_id": 0}) + agent.store_action(action) + game.step() + + # 3: Check that the service is now in the patching state + assert svc.health_state_actual == SoftwareHealthState.PATCHING + + # 4: perform a few do-nothing steps and check that the service is now in the good state + action = ("DONOTHING", {}) + agent.store_action(action) + game.step() + assert svc.health_state_actual == SoftwareHealthState.GOOD + + +def test_network_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]): + """ + Test that the NetworkACLAddRuleAction can form a request and that it is accepted by the simulation. + + The ACL starts off with 4 rules, and we add a rule, and check that the ACL now has 5 rules. + """ + game, agent = game_and_agent + + # 1: Check that traffic is normal and acl starts off with 4 rules. + client_1 = game.simulation.network.get_node_by_hostname("client_1") + server_1 = game.simulation.network.get_node_by_hostname("server_1") + server_2 = game.simulation.network.get_node_by_hostname("server_2") + router = game.simulation.network.get_node_by_hostname("router") + assert router.acl.num_rules == 4 + assert client_1.ping("10.0.2.3") # client_1 can ping server_2 + assert server_2.ping("10.0.1.2") # server_2 can ping client_1 + + # 2: Add a rule to block client 1 from reaching server 2 on router + action = ( + "NETWORK_ACL_ADDRULE", + { + "position": 4, # 4th rule + "permission": 2, # DENY + "source_ip_id": 3, # 10.0.1.2 (client_1) + "dest_ip_id": 6, # 10.0.2.3 (server_2) + "dest_port_id": 1, # ALL + "source_port_id": 1, # ALL + "protocol_id": 1, # ALL + }, ) + agent.store_action(action) + game.step() + + # 3: Check that the ACL now has 5 rules, and that client 1 cannot ping server 2 + assert router.acl.num_rules == 5 + assert not client_1.ping("10.0.2.3") # Cannot ping server_2 + assert client_1.ping("10.0.2.2") # Can ping server_1 + assert not server_2.ping( + "10.0.1.2" + ) # Server 2 can't ping client_1 (although rule is one-way, the ping response is blocked) + + # 4: Add a rule to block server_1 from reaching server_2 on router (this should not affect comms as they are on same subnet) + action = ( + "NETWORK_ACL_ADDRULE", + { + "position": 5, # 5th rule + "permission": 2, # DENY + "source_ip_id": 5, # 10.0.2.2 (server_1) + "dest_ip_id": 6, # 10.0.2.3 (server_2) + "dest_port_id": 1, # ALL + "source_port_id": 1, # ALL + "protocol_id": 1, # ALL + }, + ) + agent.store_action(action) + game.step() + + # 5: Check that the ACL now has 6 rules, but that server_1 can still ping server_2 + assert router.acl.num_rules == 6 + assert server_1.ping("10.0.2.3") # Can ping server_2 + + +# def test_network_acl_removerule_integration() diff --git a/tests/unit_tests/_primaite/_game/_agent/test_actions.py b/tests/unit_tests/_primaite/_game/_agent/test_actions.py index 9b641fe2..b41e22c9 100644 --- a/tests/unit_tests/_primaite/_game/_agent/test_actions.py +++ b/tests/unit_tests/_primaite/_game/_agent/test_actions.py @@ -62,7 +62,7 @@ def test_service_action_form_request(node_name, service_name, expect_to_do_nothi if expect_to_do_nothing: assert request == ["do_nothing"] else: - assert request == ["network", "node", node_name, "services", service_name, action_verb] + assert request == ["network", "node", node_name, "service", service_name, action_verb] @pytest.mark.parametrize( @@ -87,4 +87,4 @@ def test_service_scan_form_request(node_name, service_name, expect_to_do_nothing if expect_to_do_nothing: assert request == ["do_nothing"] else: - assert request == ["network", "node", node_name, "services", service_name, "scan"] + assert request == ["network", "node", node_name, "service", service_name, "scan"]