From 4ae0275dc9e6b6a46ab4aa6515d056f0d4f96c88 Mon Sep 17 00:00:00 2001 From: "Archer.Bowen" Date: Mon, 5 Aug 2024 16:53:48 +0100 Subject: [PATCH] #2689 Implemented initial agent actions and started on documentations. A few TODO's left to do such as validation and expanding unit tests. --- .../system/applications/c2_suite.rst | 145 ++++++++++++ .../red_applications/c2/abstract_c2.py | 4 +- .../red_applications/c2/c2_beacon.py | 19 +- .../red_applications/c2/c2_server.py | 17 +- .../red_applications/ransomware_script.py | 24 ++ .../_red_applications/test_c2_suite.py | 224 +++++++++++++++++- 6 files changed, 412 insertions(+), 21 deletions(-) create mode 100644 docs/source/simulation_components/system/applications/c2_suite.rst diff --git a/docs/source/simulation_components/system/applications/c2_suite.rst b/docs/source/simulation_components/system/applications/c2_suite.rst new file mode 100644 index 00000000..c360d0be --- /dev/null +++ b/docs/source/simulation_components/system/applications/c2_suite.rst @@ -0,0 +1,145 @@ +.. only:: comment + + © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK + +.. _C2_Suite: + +Command and Control Application Suite +##################################### + +Comprising of two applications, the command and control (C2) suites intends to introduce +malicious network architecture and begin to further the realism of red agents within primAITE. + +Overview: +========= + +These two new classes intend to Red Agents a cyber realistic way of leveraging the capabilities of the ``Terminal`` application. +Whilst introducing both more oppourtinies for the blue agent to notice and subvert Red Agents during an episode. + +For a more in-depth look at the command and control applications then please refer to the ``C2-E2E-Notebook``. + +``C2 Server`` +"""""""""""" + +The C2 Server application is intended to represent the malicious infrastructure already under the control of an adversary. + +The C2 Server is configured to listen and await ``keep alive`` traffic from a c2 beacon. Once received the C2 Server is able to send and receive c2 commands. + +Currently, the C2 Server offers three commands: + ++---------------------+---------------------------------------------------------------------------+ +|C2 Command | Meaning | ++=====================+===========================================================================+ +|RANSOMWARE_CONFIGURE | Configures an installed ransomware script based on the passed parameters. | ++---------------------+---------------------------------------------------------------------------+ +|RANSOMWARE_LAUNCH | Launches the installed ransomware script. | ++---------------------+---------------------------------------------------------------------------+ +|TERMINAL_COMMAND | Executes a command via the terminal installed on the C2 Beacons Host. | ++---------------------+---------------------------------------------------------------------------+ + + +It's important to note that in order to keep the PrimAITE realistic from a cyber perspective, +The C2 Server application should never be visible or actionable upon directly by the blue agent. + +This is because in the real world, C2 servers are hosted on ephemeral public domains that would not be accessible by private network blue agent. +Therefore granting a blue agent's the ability to perform counter measures directly against the application would be unrealistic. + +It is more accurate to see the host that the C2 Server is installed on as being able to route to the C2 Server (Internet Access). + +``C2 Beacon`` +""""""""""""" + +The C2 Beacon application is intended to represent malware that is used to establish and maintain contact to a C2 Server within a compromised network. + +A C2 Beacon will need to be first configured with the C2 Server IP Address which can be done via the ``configure`` method. + +Once installed and configured; the c2 beacon can establish connection with the C2 Server via executing the application. + +This will send an initial ``keep alive`` to the given C2 Server (The C2 Server IPv4Address must be given upon C2 Beacon configuration). +Which is then resolved and responded by another ``Keep Alive`` by the c2 server back to the C2 beacon to confirm connection. + +The C2 Beacon will send out periodic keep alive based on it's configuration parameters to configure it's active connection with the c2 server. + +It's recommended that a C2 Beacon is installed and configured mid episode by a Red Agent for a more cyber realistic simulation. + +Usage +===== + +As mentioned, the C2 Suite is intended to grant Red Agents further flexibility whilst also expanding a blue agent's observation_space. + +Adding to this, the following behaviour of the C2 beacon can be configured by users for increased domain randomisation: + +- Frequency of C2 ``Keep Alive `` Communication`` +- C2 Communication Port +- C2 Communication Protocol + + +Implementation +============== + +Both applications inherit from an abstract C2 which handles the keep alive functionality and main logic. +However, each host implements it's receive methods individually. + +- The ``C2 Beacon`` is responsible for the following logic: + - Establishes and confirms connection to the C2 Server via sending ``C2Payload.KEEP_ALIVE``. + - Receives and executes C2 Commands given by the C2 Server via ``C2Payload.INPUT``. + - Returns the RequestResponse of the C2 Commands executed back the C2 Server via ``C2Payload.OUTPUT``. + +- The ``C2 Server`` is responsible for the following logic: + - Listens and resolves connection to a C2 Beacon via responding to ``C2Payload.KEEP_ALIVE``. + - Sends C2 Commands to the C2 Beacon via ``C2Payload.INPUT``. + - Receives the RequestResponse of the C2 Commands executed by C2 Beacon via ``C2Payload.OUTPUT``. + + + +Examples +======== + +Python +"""""" +.. code-block:: python + from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon + from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server + from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command + from primaite.simulator.network.hardware.nodes.host.computer import Computer + + # Network Setup + + node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0) + node_a.power_on() + node_a.software_manager.install(software_class=C2Server) + node_a.software_manager.get_open_ports() + + + node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0) + node_b.power_on() + node_b.software_manager.install(software_class=C2Beacon) + node_b.software_manager.install(software_class=RansomwareScript) + network.connect(node_a.network_interface[1], node_b.network_interface[1]) + + + # C2 Application objects + + c2_server_host = simulation_testing_network.get_node_by_hostname("node_a") + c2_beacon_host = simulation_testing_network.get_node_by_hostname("node_b") + + + c2_server: C2Server = c2_server_host.software_manager.software["C2Server"] + c2_beacon: C2Beacon = c2_beacon_host.software_manager.software["C2Beacon"] + + # Configuring the C2 Beacon + c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=5) + + # Launching the C2 Server (Needs to be running in order to listen for connections) + c2_server.run() + + # Establishing connection + c2_beacon.establish() + + # Example command: Configuring Ransomware + + ransomware_config = {"server_ip_address": "1.1.1.1"} + c2_server._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config) + + +For a more in-depth look at the command and control applications then please refer to the ``C2-Suite-E2E-Notebook``. diff --git a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py index 89ab7953..9158d80f 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/abstract_c2.py @@ -213,7 +213,7 @@ class AbstractC2(Application, identifier="AbstractC2"): return True else: self.sys_log.warning( - f"{self.name}: failed to send a Keep Alive. The node may be unable to access the ``network." + f"{self.name}: failed to send a Keep Alive. The node may be unable to access networking resources." ) return False @@ -251,7 +251,7 @@ class AbstractC2(Application, identifier="AbstractC2"): # This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon) if self.c2_remote_connection is None: self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.") - self.c2_remote_connection = self.current_c2_session.with_ip_address + self.c2_remote_connection = IPv4Address(self.current_c2_session.with_ip_address) self.c2_connection_active = True # Sets the connection to active self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index b00b7c57..16420164 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -13,6 +13,7 @@ from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload +from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.software import SoftwareHealthState @@ -44,8 +45,6 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"): # TODO: # Implement the placeholder command methods - # Implement the keep alive frequency. - # Implement a command output method that sends the RequestResponse to the C2 server. # Uncomment the terminal Import and the terminal property after terminal PR # @property @@ -56,6 +55,14 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"): # self.sys_log.warning(f"{self.__class__.__name__} cannot find a terminal on its host.") # return host_terminal + @property + def _host_ransomware_script(self) -> RansomwareScript: + """Return the RansomwareScript that is installed on the same machine as the C2 Beacon.""" + ransomware_script: RansomwareScript = self.software_manager.software.get("RansomwareScript") + if ransomware_script is None: + self.sys_log.warning(f"{self.__class__.__name__} cannot find installed ransomware on its host.") + return ransomware_script + def _init_request_manager(self) -> RequestManager: """ Initialise the request manager. @@ -87,6 +94,7 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"): ) c2_remote_ip = IPv4Address(c2_remote_ip) + # TODO: validation. frequency = request[-1].get("keep_alive_frequency") protocol = request[-1].get("masquerade_protocol") port = request[-1].get("masquerade_port") @@ -127,7 +135,7 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"): :param masquerade_port: The Port that the C2 Traffic will masquerade as. Defaults to FTP. :type masquerade_port: Enum (Port) """ - self.c2_remote_connection = c2_server_ip_address + self.c2_remote_connection = IPv4Address(c2_server_ip_address) self.keep_alive_frequency = keep_alive_frequency self.current_masquerade_port = masquerade_port self.current_masquerade_protocol = masquerade_protocol @@ -252,7 +260,10 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"): :rtype: Request Response """ # TODO: replace and use terminal - return RequestResponse(status="success", data={"Reason": "Placeholder."}) + # return RequestResponse(status="success", data={"Reason": "Placeholder."}) + given_config = payload.payload + host_ransomware = self._host_ransomware_script + return RequestResponse.from_bool(host_ransomware.configure(server_ip_address=given_config["server_ip_address"])) def _command_ransomware_launch(self, payload: MasqueradePacket) -> RequestResponse: """ diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index c29cd271..d01cd412 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -48,8 +48,8 @@ class C2Server(AbstractC2, identifier="C2 Server"): :rtype: RequestResponse """ # TODO: Parse the parameters from the request to get the parameters - placeholder: dict = {} - return self._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=placeholder) + ransomware_config = {"server_ip_address": request[-1].get("server_ip_address")} + return self._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config) def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse: """Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters. @@ -61,9 +61,7 @@ class C2Server(AbstractC2, identifier="C2 Server"): :return: RequestResponse object with a success code reflecting whether the ransomware was launched. :rtype: RequestResponse """ - # TODO: Parse the parameters from the request to get the parameters - placeholder: dict = {} - return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder) + return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options={}) def _remote_terminal_action(request: RequestFormat, context: Dict) -> RequestResponse: """Agent Action - Sends a TERMINAL C2Command to the C2 Beacon with the given parameters. @@ -77,18 +75,18 @@ class C2Server(AbstractC2, identifier="C2 Server"): """ # TODO: Parse the parameters from the request to get the parameters placeholder: dict = {} - return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder) + return self._send_command(given_command=C2Command.TERMINAL, command_options=placeholder) rm.add_request( - name="c2_ransomware_configure", + name="ransomware_configure", request_type=RequestType(func=_configure_ransomware_action), ) rm.add_request( - name="c2_ransomware_launch", + name="ransomware_launch", request_type=RequestType(func=_launch_ransomware_action), ) rm.add_request( - name="c2_terminal_command", + name="terminal_command", request_type=RequestType(func=_remote_terminal_action), ) return rm @@ -203,7 +201,6 @@ class C2Server(AbstractC2, identifier="C2 Server"): self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.") command_packet = self._craft_packet(given_command=given_command, command_options=command_options) - # Need to investigate if this is correct. if self.send( payload=command_packet, dest_ip_address=self.c2_remote_connection, diff --git a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py index 77a6bf2c..2046affc 100644 --- a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py +++ b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py @@ -2,6 +2,8 @@ from ipaddress import IPv4Address from typing import Dict, Optional +from prettytable import MARKDOWN, PrettyTable + from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.transmission.network_layer import IPProtocol @@ -169,3 +171,25 @@ class RansomwareScript(Application, identifier="RansomwareScript"): else: self.sys_log.warning("Attack Attempted to launch too quickly") return False + + def show(self, markdown: bool = False): + """ + Prints a table of the current status of the Ransomware Script. + + Displays the current values of the following Ransomware Attributes: + + ``server_ip_address`: + The IP of the target database. + + ``payload``: + The payload (type of attack) to be sent to the database. + + :param markdown: If True, outputs the table in markdown format. Default is False. + """ + table = PrettyTable(["Target Server IP Address", "Payload"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.name} Running Status" + table.add_row([self.server_ip_address, self.payload]) + print(table) diff --git a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py index 7f869e92..064ef57d 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py @@ -1,4 +1,5 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from ipaddress import IPv4Address from typing import Tuple import pytest @@ -12,14 +13,13 @@ from primaite.simulator.network.hardware.nodes.network.router import ACLAction, from primaite.simulator.network.hardware.nodes.network.switch import Switch from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server +from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.services.dns.dns_server import DNSServer -from primaite.simulator.system.services.service import ServiceOperatingState from primaite.simulator.system.services.web_server.web_server import WebServer -# TODO: Update these tests. - @pytest.fixture(scope="function") def c2_server_on_computer() -> Tuple[C2Beacon, Computer]: @@ -60,7 +60,7 @@ def basic_network() -> Network: def test_c2_suite_setup_receive(basic_network): - """Test that C2 Beacon can successfully establish connection with the c2 Server.""" + """Test that C2 Beacon can successfully establish connection with the C2 Server.""" network: Network = basic_network computer_a: Computer = network.get_node_by_hostname("node_a") c2_server: C2Server = computer_a.software_manager.software.get("C2Server") @@ -68,7 +68,221 @@ def test_c2_suite_setup_receive(basic_network): computer_b: Computer = network.get_node_by_hostname("node_b") c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + # Assert that the c2 beacon configure correctly. c2_beacon.configure(c2_server_ip_address="192.168.0.10") + assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.10") + + c2_server.run() c2_beacon.establish() - c2_beacon.sys_log.show() + # Asserting that the c2 beacon has established a c2 connection + assert c2_beacon.c2_connection_active is True + + # Asserting that the c2 server has established a c2 connection. + assert c2_server.c2_connection_active is True + assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11") + + +def test_c2_suite_keep_alive_inactivity(basic_network): + """Tests that C2 Beacon disconnects from the C2 Server after inactivity.""" + network: Network = basic_network + computer_a: Computer = network.get_node_by_hostname("node_a") + c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + + computer_b: Computer = network.get_node_by_hostname("node_b") + c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + + # Initial config (#TODO: Make this a function) + c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2) + c2_server.run() + c2_beacon.establish() + + c2_beacon.apply_timestep(0) + assert c2_beacon.keep_alive_inactivity == 1 + + # Keep Alive successfully sent and received upon the 2nd timestep. + c2_beacon.apply_timestep(1) + assert c2_beacon.keep_alive_inactivity == 0 + assert c2_beacon.c2_connection_active == True + + # Now we turn off the c2 server (Thus preventing a keep alive) + c2_server.close() + c2_beacon.apply_timestep(2) + c2_beacon.apply_timestep(3) + assert c2_beacon.keep_alive_inactivity == 2 + assert c2_beacon.c2_connection_active == False + assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED + + +# TODO: Flesh out these tests. +def test_c2_suite_configure_via_actions(basic_network): + """Tests that a red agent is able to configure the c2 beacon and c2 server via Actions.""" + # Setting up the network: + network: Network = basic_network + computer_a: Computer = network.get_node_by_hostname("node_a") + c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + + computer_b: Computer = network.get_node_by_hostname("node_b") + c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + + # Testing Via Requests: + network.apply_request(["node", "node_a", "application", "C2Server", "run"]) + + c2_beacon_config = { + "c2_server_ip_address": "192.168.0.10", + "keep_alive_frequency": 5, + "masquerade_protocol": IPProtocol.TCP, + "masquerade_port": Port.HTTP, + } + + network.apply_request(["node", "node_b", "application", "C2Beacon", "configure", c2_beacon_config]) + network.apply_request(["node", "node_b", "application", "C2Beacon", "execute"]) + + assert c2_beacon.c2_connection_active is True + assert c2_server.c2_connection_active is True + assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11") + + # Testing Via Agents: + # TODO: + + +def test_c2_suite_configure_ransomware(basic_network): + """Tests that a red agent is able to configure ransomware via C2 Server Actions.""" + # Setting up the network: + network: Network = basic_network + computer_a: Computer = network.get_node_by_hostname("node_a") + c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + + computer_b: Computer = network.get_node_by_hostname("node_b") + c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + + c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2) + c2_server.run() + c2_beacon.establish() + + # Testing Via Requests: + computer_b.software_manager.install(software_class=RansomwareScript) + ransomware_config = {"server_ip_address": "1.1.1.1"} + network.apply_request(["node", "node_a", "application", "C2Server", "ransomware_configure", ransomware_config]) + + ransomware_script: RansomwareScript = computer_b.software_manager.software["RansomwareScript"] + + assert ransomware_script.server_ip_address == "1.1.1.1" + + # Testing Via Agents: + # TODO: + + +def test_c2_suite_terminal(basic_network): + """Tests that a red agent is able to execute terminal commands via C2 Server Actions.""" + + +@pytest.fixture(scope="function") +def acl_network() -> Network: + # 0: Pull out the network + network = Network() + + # 1: Set up network hardware + # 1.1: Configure the router + router = Router(hostname="router", num_ports=3, start_up_duration=0) + router.power_on() + router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0") + router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0") + + # 1.2: Create and connect switches + switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0) + switch_1.power_on() + network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6]) + router.enable_port(1) + switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0) + switch_2.power_on() + network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6]) + router.enable_port(2) + + # 1.3: Create and connect computer + client_1 = Computer( + hostname="client_1", + ip_address="10.0.1.2", + subnet_mask="255.255.255.0", + default_gateway="10.0.1.1", + start_up_duration=0, + ) + client_1.power_on() + client_1.software_manager.install(software_class=C2Server) + network.connect( + endpoint_a=client_1.network_interface[1], + endpoint_b=switch_1.network_interface[1], + ) + + client_2 = Computer( + hostname="client_2", + ip_address="10.0.1.3", + subnet_mask="255.255.255.0", + default_gateway="10.0.1.1", + start_up_duration=0, + ) + client_2.power_on() + client_2.software_manager.install(software_class=C2Beacon) + network.connect(endpoint_a=client_2.network_interface[1], endpoint_b=switch_2.network_interface[1]) + + # 1.4: Create and connect servers + server_1 = Server( + hostname="server_1", + ip_address="10.0.2.2", + subnet_mask="255.255.255.0", + default_gateway="10.0.2.1", + start_up_duration=0, + ) + server_1.power_on() + network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1]) + + server_2 = Server( + hostname="server_2", + ip_address="10.0.2.3", + subnet_mask="255.255.255.0", + default_gateway="10.0.2.1", + start_up_duration=0, + ) + server_2.power_on() + network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2]) + + return network + + +# TODO: Fix this test: Not sure why this isn't working + + +def test_c2_suite_acl_block(acl_network): + """Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules.""" + network: Network = acl_network + computer_a: Computer = network.get_node_by_hostname("client_1") + c2_server: C2Server = computer_a.software_manager.software.get("C2Server") + + computer_b: Computer = network.get_node_by_hostname("client_2") + c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + + router: Router = network.get_node_by_hostname("router") + + network.apply_timestep(0) + # Initial config (#TODO: Make this a function) + c2_beacon.configure(c2_server_ip_address="10.0.1.2", keep_alive_frequency=2) + + c2_server.run() + c2_beacon.establish() + + assert c2_beacon.keep_alive_inactivity == 0 + assert c2_beacon.c2_connection_active == True + assert c2_server.c2_connection_active == True + + # Now we add a HTTP blocking acl (Thus preventing a keep alive) + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=1) + + c2_beacon.apply_timestep(1) + c2_beacon.apply_timestep(2) + assert c2_beacon.keep_alive_inactivity == 2 + assert c2_beacon.c2_connection_active == False + assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED + + +def test_c2_suite_launch_ransomware(basic_network): + """Tests that a red agent is able to launch ransomware via C2 Server Actions."""