From ce3805cd15c0bdece670ef42fbd34b2f91de5f5d Mon Sep 17 00:00:00 2001 From: Archer Bowen Date: Mon, 12 Aug 2024 10:47:56 +0100 Subject: [PATCH] #2689 Updated c2 tests significantly and improved quality of debug logging. --- .../red_applications/c2/abstract_c2.py | 22 +- .../red_applications/c2/c2_beacon.py | 4 + .../red_applications/c2/c2_server.py | 8 +- .../test_c2_suite_integration.py | 190 +++++++++++++++++- .../_red_applications/test_c2_suite.py | 71 ++++++- 5 files changed, 281 insertions(+), 14 deletions(-) diff --git a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py index a00b8570..3c9080b3 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py @@ -13,11 +13,6 @@ from primaite.simulator.system.applications.application import Application, Appl from primaite.simulator.system.core.session_manager import Session from primaite.simulator.system.software import SoftwareHealthState -# TODO: -# Create test that leverage all the functionality needed for the different TAPs -# Create a .RST doc -# Potentially? A notebook which demonstrates a custom red agent using the c2 server for various means. - class C2Command(Enum): """Enumerations representing the different commands the C2 suite currently supports.""" @@ -196,11 +191,11 @@ class AbstractC2(Application, identifier="AbstractC2"): # (Using NOT to improve code readability) if self.c2_remote_connection is None: self.sys_log.error( - f"{self.name}: Unable to Establish connection as the C2 Server's IP Address has not been given." + f"{self.name}: Unable to establish connection as the C2 Server's IP Address has not been configured." ) if not self._can_perform_network_action(): - self.sys_log.warning(f"{self.name}: Unable to perform network actions.") + self.sys_log.warning(f"{self.name}: Unable to perform network actions. Unable to send Keep Alive.") return False # We also Pass masquerade proto`col/port so that the c2 server can reply on the correct protocol/port. @@ -223,12 +218,14 @@ class AbstractC2(Application, identifier="AbstractC2"): self.keep_alive_sent = True self.sys_log.info(f"{self.name}: Keep Alive sent to {self.c2_remote_connection}") self.sys_log.debug( - f"{self.name}: on {self.c2_config.masquerade_port} via {self.c2_config.masquerade_protocol}" + f"{self.name}: Keep Alive sent to {self.c2_remote_connection}" + f"Using Masquerade Port: {self.c2_config.masquerade_port}" + f"Using Masquerade Protocol: {self.c2_config.masquerade_protocol}" ) return True else: self.sys_log.warning( - f"{self.name}: failed to send a Keep Alive. The node may be unable to access networking resources." + f"{self.name}: Failed to send a Keep Alive. The node may be unable to access networking resources." ) return False @@ -262,6 +259,13 @@ class AbstractC2(Application, identifier="AbstractC2"): self.c2_config.masquerade_protocol = payload.masquerade_protocol self.c2_config.keep_alive_frequency = payload.keep_alive_frequency + self.sys_log.debug( + f"{self.name}: C2 Config Resolved Config from Keep Alive:" + f"Masquerade Port: {self.c2_config.masquerade_port}" + f"Masquerade Protocol: {self.c2_config.masquerade_protocol}" + f"Keep Alive Frequency: {self.c2_config.keep_alive_frequency}" + ) + # This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon) if self.c2_remote_connection is None: self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.") diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index 55dd1474..8052d0f2 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -168,6 +168,10 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): f"Masquerade Protocol: {masquerade_protocol}" f"Masquerade Port: {masquerade_port}" ) + # Send a keep alive to the C2 Server if we already have a keep alive. + if self.c2_connection_active is True: + self.sys_log.info(f"{self.name}: Updating C2 Server with updated C2 configuration.") + self._send_keep_alive(self.c2_session.uuid if not None else None) return True def establish(self) -> bool: diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index e4bf3302..6b51f8c7 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -317,10 +317,12 @@ class C2Server(AbstractC2, identifier="C2Server"): :rtype bool: """ if self.keep_alive_inactivity > self.c2_config.keep_alive_frequency: - self.sys_log.debug( - f"{self.name}: Failed to receive expected keep alive from {self.c2_remote_connection} at {timestep}." - ) self.sys_log.info(f"{self.name}: C2 Beacon connection considered dead due to inactivity.") + self.sys_log.debug( + f"{self.name}: Did not receive expected keep alive connection from {self.c2_remote_connection}" + f"{self.name}: Expected at timestep: {timestep} due to frequency: {self.c2_config.keep_alive_frequency}" + f"{self.name}: Last Keep Alive received at {(timestep - self.keep_alive_inactivity)}" + ) self._reset_c2_connection() return False return True diff --git a/tests/integration_tests/system/red_applications/test_c2_suite_integration.py b/tests/integration_tests/system/red_applications/test_c2_suite_integration.py index ab609cb0..56b354d7 100644 --- a/tests/integration_tests/system/red_applications/test_c2_suite_integration.py +++ b/tests/integration_tests/system/red_applications/test_c2_suite_integration.py @@ -10,7 +10,7 @@ from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHe from primaite.simulator.network.container import Network from primaite.simulator.network.hardware.nodes.host.computer import Computer from primaite.simulator.network.hardware.nodes.host.server import Server -from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router +from primaite.simulator.network.hardware.nodes.network.router import AccessControlList, ACLAction, Router from primaite.simulator.network.hardware.nodes.network.switch import Switch from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port @@ -209,6 +209,8 @@ def test_c2_suite_acl_block(basic_network): network: Network = basic_network network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + computer_b.software_manager.install(software_class=RansomwareScript) + ransomware_config = {"server_ip_address": "192.168.0.2"} router: Router = network.get_node_by_hostname("router") @@ -275,3 +277,189 @@ def test_c2_suite_terminal_command_file_creation(basic_network): assert computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True assert c2_beacon.remote_terminal_session is not None + + +def test_c2_suite_acl_bypass(basic_network): + """Tests that C2 Beacon can be reconfigured to connect C2 Server to bypass blocking ACL rules. + + 1. This Test first configures a router to block HTTP traffic and asserts the following: + 1. C2 Beacon and C2 Server are unable to maintain connection + 2. Traffic is confirmed to be blocked by the ACL rule. + + 2. Next the C2 Beacon is re-configured to use FTP which is permitted by the ACL and asserts the following; + 1. The C2 Beacon and C2 Server re-establish connection + 2. The ACL rule has not prevent any further traffic. + 3. A test file create command is sent & it's output confirmed + + 3. The ACL is then re-configured to block FTP traffic and asserts the following: + 1. C2 Beacon and C2 Server are unable to maintain connection + 2. Traffic is confirmed to be blocked by the ACL rule. + + 4. Next the C2 Beacon is re-configured to use HTTP which is permitted by the ACL and asserts the following; + 1. The C2 Beacon and C2 Server re-establish connection + 2. The ACL rule has not prevent any further traffic. + 3. A test file create command is sent & it's output confirmed + """ + + network: Network = basic_network + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + router: Router = network.get_node_by_hostname("router") + + ################ Confirm Default Setup ######################### + + # Permitting all HTTP & FTP traffic + router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.HTTP, dst_port=Port.HTTP, position=0) + router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.FTP, dst_port=Port.FTP, position=1) + + c2_beacon.apply_timestep(0) + assert c2_beacon.keep_alive_inactivity == 1 + + # Keep Alive successfully sent and received upon the 2nd timestep. + c2_beacon.apply_timestep(1) + + assert c2_beacon.keep_alive_inactivity == 0 + assert c2_beacon.c2_connection_active == True + + ################ Denying HTTP Traffic ######################### + + # Now we add a HTTP blocking acl (Thus preventing a keep alive) + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=0) + blocking_acl: AccessControlList = router.acl.acl[0] + + # Asserts to show the C2 Suite is unable to maintain connection: + + network.apply_timestep(2) + network.apply_timestep(3) + + c2_packets_blocked = blocking_acl.match_count + assert c2_packets_blocked != 0 + assert c2_beacon.c2_connection_active is False + + # Stepping one more time to confirm that the C2 server drops its connection + network.apply_timestep(4) + assert c2_server.c2_connection_active is False + + ################ Configuring C2 to use FTP ##################### + + # Reconfiguring the c2 beacon to now use FTP + c2_beacon.configure( + c2_server_ip_address="192.168.0.2", + keep_alive_frequency=2, + masquerade_port=Port.FTP, + masquerade_protocol=IPProtocol.TCP, + ) + + c2_beacon.establish() + + ################ Confirming connection via FTP ##################### + + # Confirming we've re-established connection + + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Confirming that we can send commands: + + ftp_file_create_command = { + "commands": [ + ["file_system", "create", "folder", "test_folder"], + ["file_system", "create", "file", "test_folder", "ftp_test_file", "True"], + ], + "username": "admin", + "password": "admin", + "ip_address": None, + } + c2_server._send_command(C2Command.TERMINAL, command_options=ftp_file_create_command) + assert ( + computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="ftp_test_file") + == True + ) + + # Confirming we can maintain connection + + # Stepping twenty timesteps in the network + i = 4 # We're already at the 4th timestep (starting at timestep 4) + + for i in range(20): + network.apply_timestep(i) + + # Confirming HTTP ACL ineffectiveness (C2 Bypass) + + # Asserting that the ACL hasn't caught more traffic and the c2 connection is still active + assert c2_packets_blocked == blocking_acl.match_count + assert c2_server.c2_connection_active is True + assert c2_beacon.c2_connection_active is True + + ################ Denying FTP Traffic & Enable HTTP ######################### + + # Blocking FTP and re-permitting HTTP: + router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.HTTP, dst_port=Port.HTTP, position=0) + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.FTP, dst_port=Port.FTP, position=1) + blocking_acl: AccessControlList = router.acl.acl[1] + + # Asserts to show the C2 Suite is unable to maintain connection: + + network.apply_timestep(25) + network.apply_timestep(26) + + c2_packets_blocked = blocking_acl.match_count + assert c2_packets_blocked != 0 + assert c2_beacon.c2_connection_active is False + + # Stepping one more time to confirm that the C2 server drops its connection + network.apply_timestep(27) + assert c2_server.c2_connection_active is False + + ################ Configuring C2 to use HTTP ##################### + + # Reconfiguring the c2 beacon to now use HTTP Again + c2_beacon.configure( + c2_server_ip_address="192.168.0.2", + keep_alive_frequency=2, + masquerade_port=Port.HTTP, + masquerade_protocol=IPProtocol.TCP, + ) + + c2_beacon.establish() + + ################ Confirming connection via HTTP ##################### + + # Confirming we've re-established connection + + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Confirming that we can send commands + + http_file_create_command = { + "commands": [ + ["file_system", "create", "folder", "test_folder"], + ["file_system", "create", "file", "test_folder", "http_test_file", "True"], + ], + "username": "admin", + "password": "admin", + "ip_address": None, + } + c2_server._send_command(C2Command.TERMINAL, command_options=http_file_create_command) + assert ( + computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="http_test_file") + == True + ) + + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Confirming we can maintain connection + + # Stepping twenty timesteps in the network + i = 28 # We're already at the 28th timestep + + for i in range(20): + network.apply_timestep(i) + + # Confirming FTP ACL ineffectiveness (C2 Bypass) + + # Asserting that the ACL hasn't caught more traffic and the c2 connection is still active + assert c2_packets_blocked == blocking_acl.match_count + assert c2_server.c2_connection_active is True + assert c2_beacon.c2_connection_active is True diff --git a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py index a790081f..ed408d14 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py @@ -4,6 +4,8 @@ import pytest from primaite.simulator.network.container import Network from primaite.simulator.network.hardware.nodes.host.computer import Computer from primaite.simulator.network.hardware.nodes.host.server import Server +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server @@ -124,8 +126,38 @@ def test_c2_handle_switching_port(basic_c2_network): network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) - # Asserting that the c2 beacon has established a c2 connection + # Asserting that the c2 applications have established a c2 connection assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Assert to confirm that both the C2 server and the C2 beacon are configured correctly. + assert c2_beacon.c2_config.keep_alive_frequency is 2 + assert c2_beacon.c2_config.masquerade_port is Port.HTTP + assert c2_beacon.c2_config.masquerade_protocol is IPProtocol.TCP + + assert c2_server.c2_config.keep_alive_frequency is 2 + assert c2_server.c2_config.masquerade_port is Port.HTTP + assert c2_server.c2_config.masquerade_protocol is IPProtocol.TCP + + # Configuring the C2 Beacon. + c2_beacon.configure( + c2_server_ip_address="192.168.0.1", + keep_alive_frequency=2, + masquerade_port=Port.FTP, + masquerade_protocol=IPProtocol.TCP, + ) + + # Asserting that the c2 applications have established a c2 connection + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Assert to confirm that both the C2 server and the C2 beacon + # Have reconfigured their C2 settings. + assert c2_beacon.c2_config.masquerade_port is Port.FTP + assert c2_beacon.c2_config.masquerade_protocol is IPProtocol.TCP + + assert c2_server.c2_config.masquerade_port is Port.FTP + assert c2_server.c2_config.masquerade_protocol is IPProtocol.TCP def test_c2_handle_switching_frequency(basic_c2_network): @@ -136,3 +168,40 @@ def test_c2_handle_switching_frequency(basic_c2_network): # Asserting that the c2 beacon has established a c2 connection assert c2_beacon.c2_connection_active is True + network: Network = basic_c2_network + + network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network) + + # Asserting that the c2 applications have established a c2 connection + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Assert to confirm that both the C2 server and the C2 beacon are configured correctly. + assert c2_beacon.c2_config.keep_alive_frequency is 2 + assert c2_server.c2_config.keep_alive_frequency is 2 + + # Configuring the C2 Beacon. + c2_beacon.configure(c2_server_ip_address="192.168.0.1", keep_alive_frequency=10) + + # Asserting that the c2 applications have established a c2 connection + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + + # Assert to confirm that both the C2 server and the C2 beacon + # Have reconfigured their C2 settings. + assert c2_beacon.c2_config.keep_alive_frequency is 10 + assert c2_server.c2_config.keep_alive_frequency is 10 + + # Now skipping 9 time steps to confirm keep alive inactivity + for i in range(9): + network.apply_timestep(i) + + # If the keep alive reconfiguration failed then the keep alive inactivity could never reach 9 + # As another keep alive would have already been sent. + assert c2_beacon.keep_alive_inactivity is 9 + assert c2_server.keep_alive_inactivity is 9 + + network.apply_timestep(10) + + assert c2_beacon.keep_alive_inactivity is 0 + assert c2_server.keep_alive_inactivity is 0