From b1baf023d64f44e399706e0f5402e9ad798c4de0 Mon Sep 17 00:00:00 2001 From: Archer Bowen Date: Wed, 7 Aug 2024 14:16:50 +0100 Subject: [PATCH] #2689 Fixed up Pytests and confirmed functionality before merging from dev. --- .../Command-&-Control-E2E-Demonstration.ipynb | 2 +- .../red_applications/c2/c2_beacon.py | 1 + .../integration_tests/network/test_routing.py | 18 +-- .../system/red_applications}/test_c2_suite.py | 114 ++++++++++-------- 4 files changed, 77 insertions(+), 58 deletions(-) rename tests/{unit_tests/_primaite/_simulator/_system/_applications/_red_applications => integration_tests/system/red_applications}/test_c2_suite.py (72%) diff --git a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb index 1df85bb6..0810871b 100644 --- a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb +++ b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb @@ -390,7 +390,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.8" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index 6dd1a873..1dde28a2 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -380,6 +380,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): self.keep_alive_inactivity += 1 if not self._check_c2_connection(timestep): self.sys_log.error(f"{self.name}: Connection Severed - Application Closing.") + self.c2_connection_active = False self.clear_connections() # TODO: Shouldn't this close() method also set the health state to 'UNUSED'? self.close() diff --git a/tests/integration_tests/network/test_routing.py b/tests/integration_tests/network/test_routing.py index 62b58cbd..5f9e03ef 100644 --- a/tests/integration_tests/network/test_routing.py +++ b/tests/integration_tests/network/test_routing.py @@ -33,18 +33,18 @@ def pc_a_pc_b_router_1() -> Tuple[Computer, Computer, Router]: ) pc_b.power_on() - router_1 = Router(hostname="router_1", start_up_duration=0) - router_1.power_on() + router = Router(hostname="router", start_up_duration=0) + router.power_on() - router_1.configure_port(1, "192.168.0.1", "255.255.255.0") - router_1.configure_port(2, "192.168.1.1", "255.255.255.0") + router.configure_port(1, "192.168.0.1", "255.255.255.0") + router.configure_port(2, "192.168.1.1", "255.255.255.0") - network.connect(endpoint_a=pc_a.network_interface[1], endpoint_b=router_1.network_interface[1]) - network.connect(endpoint_a=pc_b.network_interface[1], endpoint_b=router_1.network_interface[2]) - router_1.enable_port(1) - router_1.enable_port(2) + network.connect(endpoint_a=pc_a.network_interface[1], endpoint_b=router.network_interface[1]) + network.connect(endpoint_a=pc_b.network_interface[1], endpoint_b=router.network_interface[2]) + router.enable_port(1) + router.enable_port(2) - return pc_a, pc_b, router_1 + return pc_a, pc_b, router @pytest.fixture(scope="function") diff --git a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py b/tests/integration_tests/system/red_applications/test_c2_suite.py similarity index 72% rename from tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py rename to tests/integration_tests/system/red_applications/test_c2_suite.py index 7e4df4f1..9d66f3c1 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py +++ b/tests/integration_tests/system/red_applications/test_c2_suite.py @@ -6,6 +6,7 @@ import pytest from primaite.game.agent.interface import ProxyAgent from primaite.game.game import PrimaiteGame +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus from primaite.simulator.network.container import Network from primaite.simulator.network.hardware.nodes.host.computer import Computer from primaite.simulator.network.hardware.nodes.host.server import Server @@ -14,9 +15,11 @@ from primaite.simulator.network.hardware.nodes.network.switch import Switch from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import ApplicationOperatingState +from primaite.simulator.system.applications.database_client import DatabaseClient from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript +from primaite.simulator.system.services.database.database_service import DatabaseService from primaite.simulator.system.services.dns.dns_server import DNSServer from primaite.simulator.system.services.web_server.web_server import WebServer @@ -26,33 +29,46 @@ def basic_network() -> Network: network = Network() # Creating two generic nodes for the C2 Server and the C2 Beacon. - node_a = Computer(hostname="node_a", ip_address="192.168.0.2", subnet_mask="255.255.255.252", start_up_duration=0) + node_a = Computer( + hostname="node_a", + ip_address="192.168.0.2", + subnet_mask="255.255.255.252", + default_gateway="192.168.0.1", + start_up_duration=0, + ) node_a.power_on() node_a.software_manager.get_open_ports() node_a.software_manager.install(software_class=C2Server) - node_b = Computer(hostname="node_b", ip_address="192.168.255.2", subnet_mask="255.255.255.252", start_up_duration=0) - node_b.software_manager.install(software_class=C2Beacon) + node_b = Computer( + hostname="node_b", + ip_address="192.168.255.2", + subnet_mask="255.255.255.252", + default_gateway="192.168.255.1", + start_up_duration=0, + ) node_b.power_on() - + node_b.software_manager.install(software_class=C2Beacon) # Creating a router to sit between node 1 and node 2. router = Router(hostname="router", num_ports=3, start_up_duration=0) + # Default allow all. + router.acl.add_rule(action=ACLAction.PERMIT) router.power_on() - router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252") - router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252") - # Creating switches for each client. switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0) switch_1.power_on() # Connecting the switches to the router. + router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252") network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6]) - router.enable_port(1) switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0) switch_2.power_on() network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6]) + router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252") + + router.enable_port(1) router.enable_port(2) # Connecting the node to each switch @@ -72,7 +88,6 @@ def test_c2_suite_setup_receive(basic_network): computer_b: Computer = network.get_node_by_hostname("node_b") c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") - computer_a.ping("192.168.255.1") # Assert that the c2 beacon configure correctly. c2_beacon.configure(c2_server_ip_address="192.168.0.2") assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.2") @@ -97,8 +112,7 @@ def test_c2_suite_keep_alive_inactivity(basic_network): computer_b: Computer = network.get_node_by_hostname("node_b") c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") - # Initial config (#TODO: Make this a function) - c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2) + c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) c2_server.run() c2_beacon.establish() @@ -116,12 +130,11 @@ def test_c2_suite_keep_alive_inactivity(basic_network): c2_beacon.apply_timestep(3) assert c2_beacon.keep_alive_inactivity == 2 assert c2_beacon.c2_connection_active == False - assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED + assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED -# TODO: Flesh out these tests. -def test_c2_suite_configure_via_actions(basic_network): - """Tests that a red agent is able to configure the c2 beacon and c2 server via Actions.""" +def test_c2_suite_configure_request(basic_network): + """Tests that the request system can be used to successfully setup a c2 suite.""" # Setting up the network: network: Network = basic_network computer_a: Computer = network.get_node_by_hostname("node_a") @@ -131,88 +144,93 @@ def test_c2_suite_configure_via_actions(basic_network): c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") # Testing Via Requests: - network.apply_request(["node", "node_a", "application", "C2Server", "run"]) + c2_server.run() + network.apply_timestep(0) c2_beacon_config = { - "c2_server_ip_address": "192.168.0.10", + "c2_server_ip_address": "192.168.0.2", "keep_alive_frequency": 5, - "masquerade_protocol": IPProtocol.TCP, - "masquerade_port": Port.HTTP, + "masquerade_protocol": "TCP", + "masquerade_port": "HTTP", } network.apply_request(["node", "node_b", "application", "C2Beacon", "configure", c2_beacon_config]) + network.apply_timestep(0) network.apply_request(["node", "node_b", "application", "C2Beacon", "execute"]) assert c2_beacon.c2_connection_active is True assert c2_server.c2_connection_active is True - assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11") - - # Testing Via Agents: - # TODO: + assert c2_server.c2_remote_connection == IPv4Address("192.168.255.2") -def test_c2_suite_configure_ransomware(basic_network): - """Tests that a red agent is able to configure ransomware via C2 Server Actions.""" +def test_c2_suite_ransomware_commands(basic_network): + """Tests the Ransomware commands can be used to configure & launch ransomware via Requests.""" # Setting up the network: network: Network = basic_network computer_a: Computer = network.get_node_by_hostname("node_a") c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + computer_a.software_manager.install(DatabaseService) + computer_a.software_manager.software["DatabaseService"].start() computer_b: Computer = network.get_node_by_hostname("node_b") c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + computer_b.software_manager.install(DatabaseClient) + computer_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.2")) + computer_b.software_manager.software["DatabaseClient"].run() - c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2) + c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) c2_server.run() c2_beacon.establish() # Testing Via Requests: computer_b.software_manager.install(software_class=RansomwareScript) - ransomware_config = {"server_ip_address": "1.1.1.1"} + ransomware_config = {"server_ip_address": "192.168.0.2"} network.apply_request(["node", "node_a", "application", "C2Server", "ransomware_configure", ransomware_config]) ransomware_script: RansomwareScript = computer_b.software_manager.software["RansomwareScript"] - assert ransomware_script.server_ip_address == "1.1.1.1" + assert ransomware_script.server_ip_address == "192.168.0.2" - # Testing Via Agents: - # TODO: + network.apply_request(["node", "node_a", "application", "C2Server", "ransomware_launch"]) + + database_file = computer_a.software_manager.file_system.get_file("database", "database.db") + + assert database_file.health_status == FileSystemItemHealthStatus.CORRUPT -def test_c2_suite_terminal(basic_network): - """Tests that a red agent is able to execute terminal commands via C2 Server Actions.""" - - -def test_c2_suite_acl_block(acl_network): +def test_c2_suite_acl_block(basic_network): """Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules.""" - network: Network = acl_network - computer_a: Computer = network.get_node_by_hostname("client_1") + + network: Network = basic_network + computer_a: Computer = network.get_node_by_hostname("node_a") c2_server: C2Server = computer_a.software_manager.software.get("C2Server") - computer_b: Computer = network.get_node_by_hostname("client_2") + computer_b: Computer = network.get_node_by_hostname("node_b") c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") router: Router = network.get_node_by_hostname("router") - network.apply_timestep(0) - # Initial config (#TODO: Make this a function) - c2_beacon.configure(c2_server_ip_address="10.0.1.2", keep_alive_frequency=2) - + c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2) c2_server.run() c2_beacon.establish() + c2_beacon.apply_timestep(0) + assert c2_beacon.keep_alive_inactivity == 1 + + # Keep Alive successfully sent and received upon the 2nd timestep. + c2_beacon.apply_timestep(1) assert c2_beacon.keep_alive_inactivity == 0 assert c2_beacon.c2_connection_active == True - assert c2_server.c2_connection_active == True # Now we add a HTTP blocking acl (Thus preventing a keep alive) - router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=1) + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=0) - c2_beacon.apply_timestep(1) c2_beacon.apply_timestep(2) + c2_beacon.apply_timestep(3) assert c2_beacon.keep_alive_inactivity == 2 assert c2_beacon.c2_connection_active == False - assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED + assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED -def test_c2_suite_launch_ransomware(basic_network): - """Tests that a red agent is able to launch ransomware via C2 Server Actions.""" +def test_c2_suite_terminal(basic_network): + """Tests the Ransomware commands can be used to configure & launch ransomware via Requests."""