diff --git a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb index 60ea756d..1df85bb6 100644 --- a/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb +++ b/src/primaite/notebooks/Command-&-Control-E2E-Demonstration.ipynb @@ -180,7 +180,7 @@ "source": [ "## **Command and Control** | C2 Beacon Actions\n", "\n", - "Before the Red Agent is able to perform any C2 Server commands, it must first establish connection with a C2 beacon.\n", + "Before any C2 Server commands is able to accept any commands, it must first establish connection with a C2 beacon.\n", "\n", "This can be done by installing, configuring and then executing a C2 Beacon. " ] @@ -341,6 +341,37 @@ "# TODO: Post Terminal.\n", "#env.step(7)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## **Command and Control** | Blue Agent Relevance" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### **Command and Control** | Blue Agent Relevance | Observation Space" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### **Command and Control** | Blue Agent Relevance | Action Space" + ] } ], "metadata": { diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py index c73799da..6dd1a873 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_beacon.py @@ -184,8 +184,6 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :return: The Request Response provided by the terminal execute method. :rtype Request Response: """ - # TODO: Probably could refactor this to be a more clean. - # The elif's are a bit ugly when they are all calling the same method. command = payload.command if not isinstance(command, C2Command): self.sys_log.warning(f"{self.name}: Received unexpected C2 command. Unable to resolve command") @@ -253,19 +251,26 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): """ C2 Command: Ransomware Configuration. - Creates a request that configures the ransomware based off the configuration options given. - This request is then sent to the terminal service in order to be executed. + Calls the locally installed RansomwareScript application's configure method + and passes the given parameters. + + The class attribute self._host_ransomware_script will return None if the host + does not have an instance of the RansomwareScript. :payload MasqueradePacket: The incoming INPUT command. :type Masquerade Packet: MasqueradePacket. :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ - # TODO: replace and use terminal - # return RequestResponse(status="success", data={"Reason": "Placeholder."}) given_config = payload.payload - host_ransomware = self._host_ransomware_script - return RequestResponse.from_bool(host_ransomware.configure(server_ip_address=given_config["server_ip_address"])) + if self._host_ransomware_script is None: + return RequestResponse( + status="failure", + data={"Reason": "Cannot find any instances of a RansomwareScript. Have you installed one?"}, + ) + return RequestResponse.from_bool( + self._host_ransomware_script.configure(server_ip_address=given_config["server_ip_address"]) + ) def _command_ransomware_launch(self, payload: MasqueradePacket) -> RequestResponse: """ @@ -280,6 +285,11 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"): :return: Returns the Request Response returned by the Terminal execute method. :rtype: Request Response """ + if self._host_ransomware_script is None: + return RequestResponse( + status="failure", + data={"Reason": "Cannot find any instances of a RansomwareScript. Have you installed one?"}, + ) return RequestResponse.from_bool(self._host_ransomware_script.attack()) def _command_terminal(self, payload: MasqueradePacket) -> RequestResponse: diff --git a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py index c381403e..85009cec 100644 --- a/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py +++ b/src/primaite/simulator/system/applications/red_applications/c2/c2_server.py @@ -219,7 +219,7 @@ class C2Server(AbstractC2, identifier="C2Server"): """ Creates and returns a Masquerade Packet using the arguments given. - Creates Masquerade Packet with a payload_type INPUT C2Payload + Creates Masquerade Packet with a payload_type INPUT C2Payload. :param given_command: The C2 command to be sent to the C2 Beacon. :type given_command: C2Command. @@ -228,7 +228,6 @@ class C2Server(AbstractC2, identifier="C2Server"): :return: Returns the construct MasqueradePacket :rtype: MasqueradePacket """ - # TODO: Validation on command_options. constructed_packet = MasqueradePacket( masquerade_protocol=self.current_masquerade_protocol, masquerade_port=self.current_masquerade_port, diff --git a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py index 064ef57d..7e4df4f1 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_applications/_red_applications/test_c2_suite.py @@ -21,40 +21,44 @@ from primaite.simulator.system.services.dns.dns_server import DNSServer from primaite.simulator.system.services.web_server.web_server import WebServer -@pytest.fixture(scope="function") -def c2_server_on_computer() -> Tuple[C2Beacon, Computer]: - computer: Computer = Computer( - hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0 - ) - computer.power_on() - c2_beacon = computer.software_manager.software.get("C2Beacon") - - return [c2_beacon, computer] - - -@pytest.fixture(scope="function") -def c2_server_on_computer() -> Tuple[C2Server, Computer]: - computer: Computer = Computer( - hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0 - ) - computer.power_on() - c2_server = computer.software_manager.software.get("C2Server") - - return [c2_server, computer] - - @pytest.fixture(scope="function") def basic_network() -> Network: network = Network() - node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0) + + # Creating two generic nodes for the C2 Server and the C2 Beacon. + node_a = Computer(hostname="node_a", ip_address="192.168.0.2", subnet_mask="255.255.255.252", start_up_duration=0) node_a.power_on() node_a.software_manager.get_open_ports() node_a.software_manager.install(software_class=C2Server) - node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0) + node_b = Computer(hostname="node_b", ip_address="192.168.255.2", subnet_mask="255.255.255.252", start_up_duration=0) node_b.software_manager.install(software_class=C2Beacon) node_b.power_on() - network.connect(node_a.network_interface[1], node_b.network_interface[1]) + + # Creating a router to sit between node 1 and node 2. + router = Router(hostname="router", num_ports=3, start_up_duration=0) + router.power_on() + router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252") + router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252") + + # Creating switches for each client. + switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0) + switch_1.power_on() + + # Connecting the switches to the router. + network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6]) + router.enable_port(1) + + switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0) + switch_2.power_on() + + network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6]) + router.enable_port(2) + + # Connecting the node to each switch + network.connect(node_a.network_interface[1], switch_1.network_interface[1]) + + network.connect(node_b.network_interface[1], switch_2.network_interface[1]) return network @@ -68,9 +72,10 @@ def test_c2_suite_setup_receive(basic_network): computer_b: Computer = network.get_node_by_hostname("node_b") c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon") + computer_a.ping("192.168.255.1") # Assert that the c2 beacon configure correctly. - c2_beacon.configure(c2_server_ip_address="192.168.0.10") - assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.10") + c2_beacon.configure(c2_server_ip_address="192.168.0.2") + assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.2") c2_server.run() c2_beacon.establish() @@ -80,7 +85,7 @@ def test_c2_suite_setup_receive(basic_network): # Asserting that the c2 server has established a c2 connection. assert c2_server.c2_connection_active is True - assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11") + assert c2_server.c2_remote_connection == IPv4Address("192.168.255.2") def test_c2_suite_keep_alive_inactivity(basic_network): @@ -177,81 +182,6 @@ def test_c2_suite_terminal(basic_network): """Tests that a red agent is able to execute terminal commands via C2 Server Actions.""" -@pytest.fixture(scope="function") -def acl_network() -> Network: - # 0: Pull out the network - network = Network() - - # 1: Set up network hardware - # 1.1: Configure the router - router = Router(hostname="router", num_ports=3, start_up_duration=0) - router.power_on() - router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0") - router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0") - - # 1.2: Create and connect switches - switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0) - switch_1.power_on() - network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6]) - router.enable_port(1) - switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0) - switch_2.power_on() - network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6]) - router.enable_port(2) - - # 1.3: Create and connect computer - client_1 = Computer( - hostname="client_1", - ip_address="10.0.1.2", - subnet_mask="255.255.255.0", - default_gateway="10.0.1.1", - start_up_duration=0, - ) - client_1.power_on() - client_1.software_manager.install(software_class=C2Server) - network.connect( - endpoint_a=client_1.network_interface[1], - endpoint_b=switch_1.network_interface[1], - ) - - client_2 = Computer( - hostname="client_2", - ip_address="10.0.1.3", - subnet_mask="255.255.255.0", - default_gateway="10.0.1.1", - start_up_duration=0, - ) - client_2.power_on() - client_2.software_manager.install(software_class=C2Beacon) - network.connect(endpoint_a=client_2.network_interface[1], endpoint_b=switch_2.network_interface[1]) - - # 1.4: Create and connect servers - server_1 = Server( - hostname="server_1", - ip_address="10.0.2.2", - subnet_mask="255.255.255.0", - default_gateway="10.0.2.1", - start_up_duration=0, - ) - server_1.power_on() - network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1]) - - server_2 = Server( - hostname="server_2", - ip_address="10.0.2.3", - subnet_mask="255.255.255.0", - default_gateway="10.0.2.1", - start_up_duration=0, - ) - server_2.power_on() - network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2]) - - return network - - -# TODO: Fix this test: Not sure why this isn't working - - def test_c2_suite_acl_block(acl_network): """Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules.""" network: Network = acl_network