From 36aecdea37e0d2a151bad4a5eeba9e8e511bcba8 Mon Sep 17 00:00:00 2001 From: Archer Bowen Date: Thu, 27 Feb 2025 14:54:10 +0000 Subject: [PATCH] #2925 Updating more tests based on PR comments. (Mainly clean-up and improving comment clarity) --- .../e2e_integration_tests/test_uc7_agents.py | 7 ++- .../test_tap003_kill_chain_repeat.py | 2 +- .../test_tap003_kill_chain_stages.py | 52 ++++--------------- .../test_tap003_multiple_rules.py | 6 ++- 4 files changed, 19 insertions(+), 48 deletions(-) diff --git a/tests/e2e_integration_tests/test_uc7_agents.py b/tests/e2e_integration_tests/test_uc7_agents.py index 7bdba714..d54bf6d2 100644 --- a/tests/e2e_integration_tests/test_uc7_agents.py +++ b/tests/e2e_integration_tests/test_uc7_agents.py @@ -28,6 +28,7 @@ from primaite.simulator.system.services.service import ServiceOperatingState from primaite.simulator.system.software import SoftwareHealthState CONFIG_FILE = _EXAMPLE_CFG / "uc7_config.yaml" +ATTACK_AGENT_INDEX = 32 @pytest.fixture(scope="function") @@ -47,8 +48,6 @@ def assert_agent_reward(env: PrimaiteGymEnv, agent_name: str, positive: bool): assert agent_reward >= 0 # Asserts that no agents are below a total reward of 0 elif positive is False: assert agent_reward <= 0 # Asserts that no agents are above a total reward of 0 - else: - print("Invalid 'positive' argument.") def test_green_agent_positive_reward(uc7_environment): @@ -139,8 +138,8 @@ def test_tap003_default_behaviour(uc7_environment): def uc7_environment_tap003() -> PrimaiteGymEnv: with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config: cfg = yaml.safe_load(uc7_config) - cfg["agents"][32]["agent_settings"]["starting_nodes"] = ["ST_PROJ-A-PRV-PC-1"] - cfg["agents"][32]["agent_settings"]["default_starting_node"] = "ST_PROJ-A-PRV-PC-1" + cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["starting_nodes"] = ["ST_PROJ-A-PRV-PC-1"] + cfg["agents"][ATTACK_AGENT_INDEX]["agent_settings"]["default_starting_node"] = "ST_PROJ-A-PRV-PC-1" env = PrimaiteGymEnv(env_config=cfg) return env diff --git a/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_repeat.py b/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_repeat.py index 2ce62ad6..04ccd656 100644 --- a/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_repeat.py +++ b/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_repeat.py @@ -85,7 +85,7 @@ def test_tap003_repeating_kill_chain_stages(): repeat_kill_chain=True, repeat_kill_chain_stages=True, manipulation_probability=1, - # Probability 0 = Will never be able to perform the access stage and progress to Manipulation. + # access_probability 0 = Will never be able to perform the access stage and progress to Manipulation. access_probability=0, planning_probability=1, ) diff --git a/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_stages.py b/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_stages.py index 9130ebf2..c475af8b 100644 --- a/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_stages.py +++ b/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_kill_chain_stages.py @@ -120,8 +120,6 @@ def test_tap003_kill_chain_stage_access(): assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name - env = environment_step(i=2, env=env) - def test_tap003_kill_chain_stage_manipulation(): """Tests the successful/failed handlers in the manipulation stage in the InsiderKillChain""" @@ -132,51 +130,35 @@ def test_tap003_kill_chain_stage_manipulation(): assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name # Testing that the stage successfully impacted the simulation - Accounts Altered - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) + env = environment_step(i=5, env=env) st_intra_prv_rt_dr_1: Router = env.game.simulation.network.get_node_by_hostname("ST_INTRA-PRV-RT-DR-1") assert st_intra_prv_rt_dr_1.user_manager.admins["admin"].password == "red_pass" - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) + env = environment_step(i=5, env=env) st_intra_prv_rt_cr: Router = env.game.simulation.network.get_node_by_hostname("ST_INTRA-PRV-RT-CR") assert st_intra_prv_rt_cr.user_manager.admins["admin"].password == "red_pass" - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) + env = environment_step(i=5, env=env) rem_pub_rt_dr: Router = env.game.simulation.network.get_node_by_hostname("REM-PUB-RT-DR") assert rem_pub_rt_dr.user_manager.admins["admin"].password == "red_pass" @@ -192,34 +174,22 @@ def test_tap003_kill_chain_stage_exploit(): rem_pub_rt_dr: Router = env.game.simulation.network.get_node_by_hostname("REM-PUB-RT-DR") assert tap003.current_kill_chain_stage == BaseKillChain.NOT_STARTED - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.RECONNAISSANCE.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.PLANNING.name - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.PLANNING.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.ACCESS.name - env.step(0) - env.step(0) + env = environment_step(i=2, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.ACCESS.name assert tap003.next_kill_chain_stage.name == InsiderKillChain.MANIPULATION.name - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) - env.step(0) + env = environment_step(i=9, env=env) assert tap003.current_kill_chain_stage.name == InsiderKillChain.EXPLOIT.name diff --git a/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_multiple_rules.py b/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_multiple_rules.py index 77b3ce75..b7b767e0 100644 --- a/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_multiple_rules.py +++ b/tests/e2e_integration_tests/threat_actor_profiles/test_tap003_multiple_rules.py @@ -82,7 +82,7 @@ RULES = [ def uc7_tap003_env(**kwargs) -> PrimaiteGymEnv: - """Setups the UC7 TAP003 Game with the start_step & frequency set to 1 with probabilities set to 1 as well""" + """Setups the UC7 TAP003 Game with a 1 timestep start_step, frequency of 2 and probabilities set to 1 as well""" with open(_EXAMPLE_CFG / "uc7_config_tap003.yaml", mode="r") as uc7_config: cfg = yaml.safe_load(uc7_config) cfg["io_settings"]["save_sys_logs"] = False @@ -141,6 +141,9 @@ def test_tap003_cycling_rules(): tap003: TAP003 = env.game.agents["attacker"] def wait_until_attack(): + # 120 environment steps to ensure that TAP003 reaches manipulate. + # If this loop finishes 120 iterations before the test finishes then TAP003 is struggling to + # reach or finish the manipulation kill chain stage correctly. for _ in range(120): # check if the agent has executed and therefore moved onto the next rule index env.step(0) @@ -198,4 +201,3 @@ def test_tap003_cycling_rules(): assert rule_3.dst_port == PORT_LOOKUP["FTP"] # If we've gotten this fair then we can pass the test :) - pass