Add action tests.

This commit is contained in:
Marek Wolan
2024-01-08 13:29:17 +00:00
parent 294c8b982f
commit 534f84ccd1
2 changed files with 118 additions and 19 deletions

View File

@@ -69,17 +69,17 @@ def install_stuff_to_sim(sim: Simulation):
# 1: Set up network hardware
# 1.1: Configure the router
router = Router(hostname="router", num_ports=3)
router = Router(hostname="router", num_ports=3, operating_state=NodeOperatingState.ON)
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
# 1.2: Create and connect switches
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1 = Switch(hostname="switch_1", num_ports=6, operating_state=NodeOperatingState.ON)
switch_1.power_on()
network.connect(endpoint_a=router.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6])
router.enable_port(1)
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2 = Switch(hostname="switch_2", num_ports=6, operating_state=NodeOperatingState.ON)
switch_2.power_on()
network.connect(endpoint_a=router.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6])
router.enable_port(2)
@@ -191,6 +191,7 @@ def game_and_agent():
{"type": "NODE_SERVICE_RESTART"},
{"type": "NODE_SERVICE_DISABLE"},
{"type": "NODE_SERVICE_ENABLE"},
{"type": "NODE_SERVICE_PATCH"},
{"type": "NODE_APPLICATION_EXECUTE"},
{"type": "NODE_FILE_SCAN"},
{"type": "NODE_FILE_CHECKHASH"},
@@ -259,31 +260,129 @@ def test_do_nothing_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent])
game.step()
@pytest.mark.skip(reason="Waiting to merge ticket 2160")
# @pytest.mark.skip(reason="Waiting to merge ticket 2166")
def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""
Test that the NodeServiceScanAction can form a request and that it is accepted by the simulation.
The health status of applications is not always updated in the state dict, rather the agent needs to perform a scan.
Therefore, we set the web browser to be corrupted, check the state is still good, then perform a scan, and check
Therefore, we set a service to be compromised, check the state is still good, then perform a scan, and check
that the state changes to the true value.
"""
game, agent = game_and_agent
browser = game.simulation.network.get_node_by_hostname("client_1").software_manager.software.get("WebBrowser")
browser.health_state_actual = SoftwareHealthState.COMPROMISED
# 1: Check that the service starts off in a good state, and that visible state is hidden until first scan
svc = game.simulation.network.get_node_by_hostname("client_1").software_manager.software.get("DNSClient")
assert svc.health_state_actual == SoftwareHealthState.GOOD
assert svc.health_state_visible == SoftwareHealthState.UNUSED
state_before = game.get_sim_state()
assert (
game.get_sim_state()["network"]["nodes"]["client_1"]["applications"]["WebBrowser"]["health_state"]
== SoftwareHealthState.GOOD
)
# 2: Scan and check that the visible state is now correct
action = ("NODE_SERVICE_SCAN", {"node_id": 0, "service_id": 0})
agent.store_action(action)
game.step()
state_after = game.get_sim_state()
pass
assert (
game.get_sim_state()["network"]["nodes"]["client_1"]["services"]["WebBrowser"]["health_state"]
== SoftwareHealthState.COMPROMISED
assert svc.health_state_actual == SoftwareHealthState.GOOD
assert svc.health_state_visible == SoftwareHealthState.GOOD
# 3: Corrupt the service and check that the visible state is still good
svc.health_state_actual = SoftwareHealthState.COMPROMISED
assert svc.health_state_visible == SoftwareHealthState.GOOD
# 4: Scan and check that the visible state is now correct
action = ("NODE_SERVICE_SCAN", {"node_id": 0, "service_id": 0})
agent.store_action(action)
game.step()
assert svc.health_state_actual == SoftwareHealthState.COMPROMISED
assert svc.health_state_visible == SoftwareHealthState.COMPROMISED
def test_node_service_patch_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""
Test that the NodeServicePatchAction can form a request and that it is accepted by the simulation.
When you initiate a patch action, the software health state turns to PATCHING, then after a few steps, it goes
to GOOD.
"""
game, agent = game_and_agent
# 1: Corrupt the service
svc = game.simulation.network.get_node_by_hostname("server_1").software_manager.software.get("DNSServer")
svc.health_state_actual = SoftwareHealthState.COMPROMISED
# 2: Apply a patch action
action = ("NODE_SERVICE_PATCH", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
# 3: Check that the service is now in the patching state
assert svc.health_state_actual == SoftwareHealthState.PATCHING
# 4: perform a few do-nothing steps and check that the service is now in the good state
action = ("DONOTHING", {})
agent.store_action(action)
game.step()
assert svc.health_state_actual == SoftwareHealthState.GOOD
def test_network_acl_addrule_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""
Test that the NetworkACLAddRuleAction can form a request and that it is accepted by the simulation.
The ACL starts off with 4 rules, and we add a rule, and check that the ACL now has 5 rules.
"""
game, agent = game_and_agent
# 1: Check that traffic is normal and acl starts off with 4 rules.
client_1 = game.simulation.network.get_node_by_hostname("client_1")
server_1 = game.simulation.network.get_node_by_hostname("server_1")
server_2 = game.simulation.network.get_node_by_hostname("server_2")
router = game.simulation.network.get_node_by_hostname("router")
assert router.acl.num_rules == 4
assert client_1.ping("10.0.2.3") # client_1 can ping server_2
assert server_2.ping("10.0.1.2") # server_2 can ping client_1
# 2: Add a rule to block client 1 from reaching server 2 on router
action = (
"NETWORK_ACL_ADDRULE",
{
"position": 4, # 4th rule
"permission": 2, # DENY
"source_ip_id": 3, # 10.0.1.2 (client_1)
"dest_ip_id": 6, # 10.0.2.3 (server_2)
"dest_port_id": 1, # ALL
"source_port_id": 1, # ALL
"protocol_id": 1, # ALL
},
)
agent.store_action(action)
game.step()
# 3: Check that the ACL now has 5 rules, and that client 1 cannot ping server 2
assert router.acl.num_rules == 5
assert not client_1.ping("10.0.2.3") # Cannot ping server_2
assert client_1.ping("10.0.2.2") # Can ping server_1
assert not server_2.ping(
"10.0.1.2"
) # Server 2 can't ping client_1 (although rule is one-way, the ping response is blocked)
# 4: Add a rule to block server_1 from reaching server_2 on router (this should not affect comms as they are on same subnet)
action = (
"NETWORK_ACL_ADDRULE",
{
"position": 5, # 5th rule
"permission": 2, # DENY
"source_ip_id": 5, # 10.0.2.2 (server_1)
"dest_ip_id": 6, # 10.0.2.3 (server_2)
"dest_port_id": 1, # ALL
"source_port_id": 1, # ALL
"protocol_id": 1, # ALL
},
)
agent.store_action(action)
game.step()
# 5: Check that the ACL now has 6 rules, but that server_1 can still ping server_2
assert router.acl.num_rules == 6
assert server_1.ping("10.0.2.3") # Can ping server_2
# def test_network_acl_removerule_integration()

View File

@@ -62,7 +62,7 @@ def test_service_action_form_request(node_name, service_name, expect_to_do_nothi
if expect_to_do_nothing:
assert request == ["do_nothing"]
else:
assert request == ["network", "node", node_name, "services", service_name, action_verb]
assert request == ["network", "node", node_name, "service", service_name, action_verb]
@pytest.mark.parametrize(
@@ -87,4 +87,4 @@ def test_service_scan_form_request(node_name, service_name, expect_to_do_nothing
if expect_to_do_nothing:
assert request == ["do_nothing"]
else:
assert request == ["network", "node", node_name, "services", service_name, "scan"]
assert request == ["network", "node", node_name, "service", service_name, "scan"]