#2689 Implemented initial agent actions and started on documentations. A few TODO's left to do such as validation and expanding unit tests.
This commit is contained in:
@@ -0,0 +1,145 @@
|
||||
.. only:: comment
|
||||
|
||||
© Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
|
||||
.. _C2_Suite:
|
||||
|
||||
Command and Control Application Suite
|
||||
#####################################
|
||||
|
||||
Comprising of two applications, the command and control (C2) suites intends to introduce
|
||||
malicious network architecture and begin to further the realism of red agents within primAITE.
|
||||
|
||||
Overview:
|
||||
=========
|
||||
|
||||
These two new classes intend to Red Agents a cyber realistic way of leveraging the capabilities of the ``Terminal`` application.
|
||||
Whilst introducing both more oppourtinies for the blue agent to notice and subvert Red Agents during an episode.
|
||||
|
||||
For a more in-depth look at the command and control applications then please refer to the ``C2-E2E-Notebook``.
|
||||
|
||||
``C2 Server``
|
||||
""""""""""""
|
||||
|
||||
The C2 Server application is intended to represent the malicious infrastructure already under the control of an adversary.
|
||||
|
||||
The C2 Server is configured to listen and await ``keep alive`` traffic from a c2 beacon. Once received the C2 Server is able to send and receive c2 commands.
|
||||
|
||||
Currently, the C2 Server offers three commands:
|
||||
|
||||
+---------------------+---------------------------------------------------------------------------+
|
||||
|C2 Command | Meaning |
|
||||
+=====================+===========================================================================+
|
||||
|RANSOMWARE_CONFIGURE | Configures an installed ransomware script based on the passed parameters. |
|
||||
+---------------------+---------------------------------------------------------------------------+
|
||||
|RANSOMWARE_LAUNCH | Launches the installed ransomware script. |
|
||||
+---------------------+---------------------------------------------------------------------------+
|
||||
|TERMINAL_COMMAND | Executes a command via the terminal installed on the C2 Beacons Host. |
|
||||
+---------------------+---------------------------------------------------------------------------+
|
||||
|
||||
|
||||
It's important to note that in order to keep the PrimAITE realistic from a cyber perspective,
|
||||
The C2 Server application should never be visible or actionable upon directly by the blue agent.
|
||||
|
||||
This is because in the real world, C2 servers are hosted on ephemeral public domains that would not be accessible by private network blue agent.
|
||||
Therefore granting a blue agent's the ability to perform counter measures directly against the application would be unrealistic.
|
||||
|
||||
It is more accurate to see the host that the C2 Server is installed on as being able to route to the C2 Server (Internet Access).
|
||||
|
||||
``C2 Beacon``
|
||||
"""""""""""""
|
||||
|
||||
The C2 Beacon application is intended to represent malware that is used to establish and maintain contact to a C2 Server within a compromised network.
|
||||
|
||||
A C2 Beacon will need to be first configured with the C2 Server IP Address which can be done via the ``configure`` method.
|
||||
|
||||
Once installed and configured; the c2 beacon can establish connection with the C2 Server via executing the application.
|
||||
|
||||
This will send an initial ``keep alive`` to the given C2 Server (The C2 Server IPv4Address must be given upon C2 Beacon configuration).
|
||||
Which is then resolved and responded by another ``Keep Alive`` by the c2 server back to the C2 beacon to confirm connection.
|
||||
|
||||
The C2 Beacon will send out periodic keep alive based on it's configuration parameters to configure it's active connection with the c2 server.
|
||||
|
||||
It's recommended that a C2 Beacon is installed and configured mid episode by a Red Agent for a more cyber realistic simulation.
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
||||
As mentioned, the C2 Suite is intended to grant Red Agents further flexibility whilst also expanding a blue agent's observation_space.
|
||||
|
||||
Adding to this, the following behaviour of the C2 beacon can be configured by users for increased domain randomisation:
|
||||
|
||||
- Frequency of C2 ``Keep Alive `` Communication``
|
||||
- C2 Communication Port
|
||||
- C2 Communication Protocol
|
||||
|
||||
|
||||
Implementation
|
||||
==============
|
||||
|
||||
Both applications inherit from an abstract C2 which handles the keep alive functionality and main logic.
|
||||
However, each host implements it's receive methods individually.
|
||||
|
||||
- The ``C2 Beacon`` is responsible for the following logic:
|
||||
- Establishes and confirms connection to the C2 Server via sending ``C2Payload.KEEP_ALIVE``.
|
||||
- Receives and executes C2 Commands given by the C2 Server via ``C2Payload.INPUT``.
|
||||
- Returns the RequestResponse of the C2 Commands executed back the C2 Server via ``C2Payload.OUTPUT``.
|
||||
|
||||
- The ``C2 Server`` is responsible for the following logic:
|
||||
- Listens and resolves connection to a C2 Beacon via responding to ``C2Payload.KEEP_ALIVE``.
|
||||
- Sends C2 Commands to the C2 Beacon via ``C2Payload.INPUT``.
|
||||
- Receives the RequestResponse of the C2 Commands executed by C2 Beacon via ``C2Payload.OUTPUT``.
|
||||
|
||||
|
||||
|
||||
Examples
|
||||
========
|
||||
|
||||
Python
|
||||
""""""
|
||||
.. code-block:: python
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
|
||||
# Network Setup
|
||||
|
||||
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
|
||||
node_a.power_on()
|
||||
node_a.software_manager.install(software_class=C2Server)
|
||||
node_a.software_manager.get_open_ports()
|
||||
|
||||
|
||||
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
|
||||
node_b.power_on()
|
||||
node_b.software_manager.install(software_class=C2Beacon)
|
||||
node_b.software_manager.install(software_class=RansomwareScript)
|
||||
network.connect(node_a.network_interface[1], node_b.network_interface[1])
|
||||
|
||||
|
||||
# C2 Application objects
|
||||
|
||||
c2_server_host = simulation_testing_network.get_node_by_hostname("node_a")
|
||||
c2_beacon_host = simulation_testing_network.get_node_by_hostname("node_b")
|
||||
|
||||
|
||||
c2_server: C2Server = c2_server_host.software_manager.software["C2Server"]
|
||||
c2_beacon: C2Beacon = c2_beacon_host.software_manager.software["C2Beacon"]
|
||||
|
||||
# Configuring the C2 Beacon
|
||||
c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=5)
|
||||
|
||||
# Launching the C2 Server (Needs to be running in order to listen for connections)
|
||||
c2_server.run()
|
||||
|
||||
# Establishing connection
|
||||
c2_beacon.establish()
|
||||
|
||||
# Example command: Configuring Ransomware
|
||||
|
||||
ransomware_config = {"server_ip_address": "1.1.1.1"}
|
||||
c2_server._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
|
||||
|
||||
|
||||
For a more in-depth look at the command and control applications then please refer to the ``C2-Suite-E2E-Notebook``.
|
||||
@@ -213,7 +213,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
|
||||
return True
|
||||
else:
|
||||
self.sys_log.warning(
|
||||
f"{self.name}: failed to send a Keep Alive. The node may be unable to access the ``network."
|
||||
f"{self.name}: failed to send a Keep Alive. The node may be unable to access networking resources."
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -251,7 +251,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
|
||||
# This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon)
|
||||
if self.c2_remote_connection is None:
|
||||
self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.")
|
||||
self.c2_remote_connection = self.current_c2_session.with_ip_address
|
||||
self.c2_remote_connection = IPv4Address(self.current_c2_session.with_ip_address)
|
||||
|
||||
self.c2_connection_active = True # Sets the connection to active
|
||||
self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero
|
||||
|
||||
@@ -13,6 +13,7 @@ from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
|
||||
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
|
||||
@@ -44,8 +45,6 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
|
||||
|
||||
# TODO:
|
||||
# Implement the placeholder command methods
|
||||
# Implement the keep alive frequency.
|
||||
# Implement a command output method that sends the RequestResponse to the C2 server.
|
||||
# Uncomment the terminal Import and the terminal property after terminal PR
|
||||
|
||||
# @property
|
||||
@@ -56,6 +55,14 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
|
||||
# self.sys_log.warning(f"{self.__class__.__name__} cannot find a terminal on its host.")
|
||||
# return host_terminal
|
||||
|
||||
@property
|
||||
def _host_ransomware_script(self) -> RansomwareScript:
|
||||
"""Return the RansomwareScript that is installed on the same machine as the C2 Beacon."""
|
||||
ransomware_script: RansomwareScript = self.software_manager.software.get("RansomwareScript")
|
||||
if ransomware_script is None:
|
||||
self.sys_log.warning(f"{self.__class__.__name__} cannot find installed ransomware on its host.")
|
||||
return ransomware_script
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
"""
|
||||
Initialise the request manager.
|
||||
@@ -87,6 +94,7 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
|
||||
)
|
||||
|
||||
c2_remote_ip = IPv4Address(c2_remote_ip)
|
||||
# TODO: validation.
|
||||
frequency = request[-1].get("keep_alive_frequency")
|
||||
protocol = request[-1].get("masquerade_protocol")
|
||||
port = request[-1].get("masquerade_port")
|
||||
@@ -127,7 +135,7 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
|
||||
:param masquerade_port: The Port that the C2 Traffic will masquerade as. Defaults to FTP.
|
||||
:type masquerade_port: Enum (Port)
|
||||
"""
|
||||
self.c2_remote_connection = c2_server_ip_address
|
||||
self.c2_remote_connection = IPv4Address(c2_server_ip_address)
|
||||
self.keep_alive_frequency = keep_alive_frequency
|
||||
self.current_masquerade_port = masquerade_port
|
||||
self.current_masquerade_protocol = masquerade_protocol
|
||||
@@ -252,7 +260,10 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
|
||||
:rtype: Request Response
|
||||
"""
|
||||
# TODO: replace and use terminal
|
||||
return RequestResponse(status="success", data={"Reason": "Placeholder."})
|
||||
# return RequestResponse(status="success", data={"Reason": "Placeholder."})
|
||||
given_config = payload.payload
|
||||
host_ransomware = self._host_ransomware_script
|
||||
return RequestResponse.from_bool(host_ransomware.configure(server_ip_address=given_config["server_ip_address"]))
|
||||
|
||||
def _command_ransomware_launch(self, payload: MasqueradePacket) -> RequestResponse:
|
||||
"""
|
||||
|
||||
@@ -48,8 +48,8 @@ class C2Server(AbstractC2, identifier="C2 Server"):
|
||||
:rtype: RequestResponse
|
||||
"""
|
||||
# TODO: Parse the parameters from the request to get the parameters
|
||||
placeholder: dict = {}
|
||||
return self._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=placeholder)
|
||||
ransomware_config = {"server_ip_address": request[-1].get("server_ip_address")}
|
||||
return self._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
|
||||
|
||||
def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
"""Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters.
|
||||
@@ -61,9 +61,7 @@ class C2Server(AbstractC2, identifier="C2 Server"):
|
||||
:return: RequestResponse object with a success code reflecting whether the ransomware was launched.
|
||||
:rtype: RequestResponse
|
||||
"""
|
||||
# TODO: Parse the parameters from the request to get the parameters
|
||||
placeholder: dict = {}
|
||||
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder)
|
||||
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options={})
|
||||
|
||||
def _remote_terminal_action(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
"""Agent Action - Sends a TERMINAL C2Command to the C2 Beacon with the given parameters.
|
||||
@@ -77,18 +75,18 @@ class C2Server(AbstractC2, identifier="C2 Server"):
|
||||
"""
|
||||
# TODO: Parse the parameters from the request to get the parameters
|
||||
placeholder: dict = {}
|
||||
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder)
|
||||
return self._send_command(given_command=C2Command.TERMINAL, command_options=placeholder)
|
||||
|
||||
rm.add_request(
|
||||
name="c2_ransomware_configure",
|
||||
name="ransomware_configure",
|
||||
request_type=RequestType(func=_configure_ransomware_action),
|
||||
)
|
||||
rm.add_request(
|
||||
name="c2_ransomware_launch",
|
||||
name="ransomware_launch",
|
||||
request_type=RequestType(func=_launch_ransomware_action),
|
||||
)
|
||||
rm.add_request(
|
||||
name="c2_terminal_command",
|
||||
name="terminal_command",
|
||||
request_type=RequestType(func=_remote_terminal_action),
|
||||
)
|
||||
return rm
|
||||
@@ -203,7 +201,6 @@ class C2Server(AbstractC2, identifier="C2 Server"):
|
||||
self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.")
|
||||
command_packet = self._craft_packet(given_command=given_command, command_options=command_options)
|
||||
|
||||
# Need to investigate if this is correct.
|
||||
if self.send(
|
||||
payload=command_packet,
|
||||
dest_ip_address=self.c2_remote_connection,
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Dict, Optional
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
|
||||
from primaite.interface.request import RequestFormat, RequestResponse
|
||||
from primaite.simulator.core import RequestManager, RequestType
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
@@ -169,3 +171,25 @@ class RansomwareScript(Application, identifier="RansomwareScript"):
|
||||
else:
|
||||
self.sys_log.warning("Attack Attempted to launch too quickly")
|
||||
return False
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
Prints a table of the current status of the Ransomware Script.
|
||||
|
||||
Displays the current values of the following Ransomware Attributes:
|
||||
|
||||
``server_ip_address`:
|
||||
The IP of the target database.
|
||||
|
||||
``payload``:
|
||||
The payload (type of attack) to be sent to the database.
|
||||
|
||||
:param markdown: If True, outputs the table in markdown format. Default is False.
|
||||
"""
|
||||
table = PrettyTable(["Target Server IP Address", "Payload"])
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.name} Running Status"
|
||||
table.add_row([self.server_ip_address, self.payload])
|
||||
print(table)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
@@ -12,14 +13,13 @@ from primaite.simulator.network.hardware.nodes.network.router import ACLAction,
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
|
||||
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
|
||||
from primaite.simulator.system.services.dns.dns_server import DNSServer
|
||||
from primaite.simulator.system.services.service import ServiceOperatingState
|
||||
from primaite.simulator.system.services.web_server.web_server import WebServer
|
||||
|
||||
# TODO: Update these tests.
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def c2_server_on_computer() -> Tuple[C2Beacon, Computer]:
|
||||
@@ -60,7 +60,7 @@ def basic_network() -> Network:
|
||||
|
||||
|
||||
def test_c2_suite_setup_receive(basic_network):
|
||||
"""Test that C2 Beacon can successfully establish connection with the c2 Server."""
|
||||
"""Test that C2 Beacon can successfully establish connection with the C2 Server."""
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
|
||||
@@ -68,7 +68,221 @@ def test_c2_suite_setup_receive(basic_network):
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
|
||||
|
||||
# Assert that the c2 beacon configure correctly.
|
||||
c2_beacon.configure(c2_server_ip_address="192.168.0.10")
|
||||
assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.10")
|
||||
|
||||
c2_server.run()
|
||||
c2_beacon.establish()
|
||||
|
||||
c2_beacon.sys_log.show()
|
||||
# Asserting that the c2 beacon has established a c2 connection
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
|
||||
# Asserting that the c2 server has established a c2 connection.
|
||||
assert c2_server.c2_connection_active is True
|
||||
assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11")
|
||||
|
||||
|
||||
def test_c2_suite_keep_alive_inactivity(basic_network):
|
||||
"""Tests that C2 Beacon disconnects from the C2 Server after inactivity."""
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
|
||||
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
|
||||
|
||||
# Initial config (#TODO: Make this a function)
|
||||
c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2)
|
||||
c2_server.run()
|
||||
c2_beacon.establish()
|
||||
|
||||
c2_beacon.apply_timestep(0)
|
||||
assert c2_beacon.keep_alive_inactivity == 1
|
||||
|
||||
# Keep Alive successfully sent and received upon the 2nd timestep.
|
||||
c2_beacon.apply_timestep(1)
|
||||
assert c2_beacon.keep_alive_inactivity == 0
|
||||
assert c2_beacon.c2_connection_active == True
|
||||
|
||||
# Now we turn off the c2 server (Thus preventing a keep alive)
|
||||
c2_server.close()
|
||||
c2_beacon.apply_timestep(2)
|
||||
c2_beacon.apply_timestep(3)
|
||||
assert c2_beacon.keep_alive_inactivity == 2
|
||||
assert c2_beacon.c2_connection_active == False
|
||||
assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
# TODO: Flesh out these tests.
|
||||
def test_c2_suite_configure_via_actions(basic_network):
|
||||
"""Tests that a red agent is able to configure the c2 beacon and c2 server via Actions."""
|
||||
# Setting up the network:
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
|
||||
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
|
||||
|
||||
# Testing Via Requests:
|
||||
network.apply_request(["node", "node_a", "application", "C2Server", "run"])
|
||||
|
||||
c2_beacon_config = {
|
||||
"c2_server_ip_address": "192.168.0.10",
|
||||
"keep_alive_frequency": 5,
|
||||
"masquerade_protocol": IPProtocol.TCP,
|
||||
"masquerade_port": Port.HTTP,
|
||||
}
|
||||
|
||||
network.apply_request(["node", "node_b", "application", "C2Beacon", "configure", c2_beacon_config])
|
||||
network.apply_request(["node", "node_b", "application", "C2Beacon", "execute"])
|
||||
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11")
|
||||
|
||||
# Testing Via Agents:
|
||||
# TODO:
|
||||
|
||||
|
||||
def test_c2_suite_configure_ransomware(basic_network):
|
||||
"""Tests that a red agent is able to configure ransomware via C2 Server Actions."""
|
||||
# Setting up the network:
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
|
||||
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
|
||||
|
||||
c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2)
|
||||
c2_server.run()
|
||||
c2_beacon.establish()
|
||||
|
||||
# Testing Via Requests:
|
||||
computer_b.software_manager.install(software_class=RansomwareScript)
|
||||
ransomware_config = {"server_ip_address": "1.1.1.1"}
|
||||
network.apply_request(["node", "node_a", "application", "C2Server", "ransomware_configure", ransomware_config])
|
||||
|
||||
ransomware_script: RansomwareScript = computer_b.software_manager.software["RansomwareScript"]
|
||||
|
||||
assert ransomware_script.server_ip_address == "1.1.1.1"
|
||||
|
||||
# Testing Via Agents:
|
||||
# TODO:
|
||||
|
||||
|
||||
def test_c2_suite_terminal(basic_network):
|
||||
"""Tests that a red agent is able to execute terminal commands via C2 Server Actions."""
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def acl_network() -> Network:
|
||||
# 0: Pull out the network
|
||||
network = Network()
|
||||
|
||||
# 1: Set up network hardware
|
||||
# 1.1: Configure the router
|
||||
router = Router(hostname="router", num_ports=3, start_up_duration=0)
|
||||
router.power_on()
|
||||
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
|
||||
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
|
||||
|
||||
# 1.2: Create and connect switches
|
||||
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
|
||||
switch_1.power_on()
|
||||
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
|
||||
router.enable_port(1)
|
||||
switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0)
|
||||
switch_2.power_on()
|
||||
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
|
||||
router.enable_port(2)
|
||||
|
||||
# 1.3: Create and connect computer
|
||||
client_1 = Computer(
|
||||
hostname="client_1",
|
||||
ip_address="10.0.1.2",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="10.0.1.1",
|
||||
start_up_duration=0,
|
||||
)
|
||||
client_1.power_on()
|
||||
client_1.software_manager.install(software_class=C2Server)
|
||||
network.connect(
|
||||
endpoint_a=client_1.network_interface[1],
|
||||
endpoint_b=switch_1.network_interface[1],
|
||||
)
|
||||
|
||||
client_2 = Computer(
|
||||
hostname="client_2",
|
||||
ip_address="10.0.1.3",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="10.0.1.1",
|
||||
start_up_duration=0,
|
||||
)
|
||||
client_2.power_on()
|
||||
client_2.software_manager.install(software_class=C2Beacon)
|
||||
network.connect(endpoint_a=client_2.network_interface[1], endpoint_b=switch_2.network_interface[1])
|
||||
|
||||
# 1.4: Create and connect servers
|
||||
server_1 = Server(
|
||||
hostname="server_1",
|
||||
ip_address="10.0.2.2",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="10.0.2.1",
|
||||
start_up_duration=0,
|
||||
)
|
||||
server_1.power_on()
|
||||
network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1])
|
||||
|
||||
server_2 = Server(
|
||||
hostname="server_2",
|
||||
ip_address="10.0.2.3",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="10.0.2.1",
|
||||
start_up_duration=0,
|
||||
)
|
||||
server_2.power_on()
|
||||
network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2])
|
||||
|
||||
return network
|
||||
|
||||
|
||||
# TODO: Fix this test: Not sure why this isn't working
|
||||
|
||||
|
||||
def test_c2_suite_acl_block(acl_network):
|
||||
"""Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules."""
|
||||
network: Network = acl_network
|
||||
computer_a: Computer = network.get_node_by_hostname("client_1")
|
||||
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
|
||||
|
||||
computer_b: Computer = network.get_node_by_hostname("client_2")
|
||||
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
|
||||
|
||||
router: Router = network.get_node_by_hostname("router")
|
||||
|
||||
network.apply_timestep(0)
|
||||
# Initial config (#TODO: Make this a function)
|
||||
c2_beacon.configure(c2_server_ip_address="10.0.1.2", keep_alive_frequency=2)
|
||||
|
||||
c2_server.run()
|
||||
c2_beacon.establish()
|
||||
|
||||
assert c2_beacon.keep_alive_inactivity == 0
|
||||
assert c2_beacon.c2_connection_active == True
|
||||
assert c2_server.c2_connection_active == True
|
||||
|
||||
# Now we add a HTTP blocking acl (Thus preventing a keep alive)
|
||||
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=1)
|
||||
|
||||
c2_beacon.apply_timestep(1)
|
||||
c2_beacon.apply_timestep(2)
|
||||
assert c2_beacon.keep_alive_inactivity == 2
|
||||
assert c2_beacon.c2_connection_active == False
|
||||
assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED
|
||||
|
||||
|
||||
def test_c2_suite_launch_ransomware(basic_network):
|
||||
"""Tests that a red agent is able to launch ransomware via C2 Server Actions."""
|
||||
|
||||
Reference in New Issue
Block a user