#2689 Updated documentation and moved _craft_packet into abstract C2

This commit is contained in:
Archer Bowen
2024-08-12 14:16:21 +01:00
parent ce3805cd15
commit cbf02ebf32
7 changed files with 306 additions and 109 deletions

View File

@@ -34,7 +34,7 @@ Currently, the C2 Server offers three commands:
+---------------------+---------------------------------------------------------------------------+
|RANSOMWARE_LAUNCH | Launches the installed ransomware script. |
+---------------------+---------------------------------------------------------------------------+
|TERMINAL_COMMAND | Executes a command via the terminal installed on the C2 Beacons Host. |
|TERMINAL | Executes a command via the terminal installed on the C2 Beacons Host. |
+---------------------+---------------------------------------------------------------------------+
@@ -69,9 +69,17 @@ As mentioned, the C2 Suite is intended to grant Red Agents further flexibility w
Adding to this, the following behaviour of the C2 beacon can be configured by users for increased domain randomisation:
- Frequency of C2 ``Keep Alive `` Communication``
- C2 Communication Port
- C2 Communication Protocol
+---------------------+---------------------------------------------------------------------------+
|Configuration Option | Option Meaning |
+=====================+===========================================================================+
|c2_server_ip_address | The IP Address of the C2 Server. (The C2 Server must be running) |
+---------------------+---------------------------------------------------------------------------+
|keep_alive_frequency | How often should the C2 Beacon confirm it's connection in timesteps. |
+---------------------+---------------------------------------------------------------------------+
|masquerade_protocol | What protocol should the C2 traffic masquerade as? (HTTP, FTP or DNS) |
+---------------------+---------------------------------------------------------------------------+
|masquerade_port | What port should the C2 traffic use? (TCP or UDP) |
+---------------------+---------------------------------------------------------------------------+
Implementation
@@ -91,6 +99,7 @@ However, each host implements it's receive methods individually.
- Receives the RequestResponse of the C2 Commands executed by C2 Beacon via ``C2Payload.OUTPUT``.
For further details and more in-depth examples please refer to the ``Command-&-Control notebook``
Examples
========
@@ -120,8 +129,8 @@ Python
# C2 Application objects
c2_server_host = simulation_testing_network.get_node_by_hostname("node_a")
c2_beacon_host = simulation_testing_network.get_node_by_hostname("node_b")
c2_server_host: computer = simulation_testing_network.get_node_by_hostname("node_a")
c2_beacon_host: computer = simulation_testing_network.get_node_by_hostname("node_b")
c2_server: C2Server = c2_server_host.software_manager.software["C2Server"]
@@ -136,10 +145,125 @@ Python
# Establishing connection
c2_beacon.establish()
# Example command: Configuring Ransomware
# Example command: Creating a file
ransomware_config = {"server_ip_address": "1.1.1.1"}
c2_server._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
file_create_command = {
"commands": [
["file_system", "create", "folder", "test_folder"],
["file_system", "create", "file", "test_folder", "example_file", "True"],
],
"username": "admin",
"password": "admin",
"ip_address": None,
}
c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
# Example commands: Installing and configuring Ransomware:
ransomware_installation_command = { "commands": [
["software_manager","application","install","RansomwareScript"],
],
"username": "admin",
"password": "admin",
"ip_address": None,
}
c2_server.send_command(given_command=C2Command.TERMINAL, command_options=ransomware_config)
ransomware_config = {"server_ip_address": "192.168.0.10"}
c2_server.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
c2_beacon_host.software_manager.show()
For a more in-depth look at the command and control applications then please refer to the ``C2-Suite-E2E-Notebook``.
Via Configuration
"""""""""""""""""
.. code-block:: yaml
simulation:
network:
nodes:
- ref: example_computer_1
hostname: computer_a
type: computer
...
applications:
type: C2Server
...
hostname: computer_b
type: computer
...
# A C2 Beacon will not automatically connection to a C2 Server.
# Either an agent must use application_execute.
# Or a user must use .establish().
applications:
type: C2Beacon
options:
c2_server_ip_address: ...
keep_alive_frequency: 5
masquerade_protocol: tcp
masquerade_port: http
C2 Beacon Configuration
=======================
.. include:: ../common/common_configuration.rst
.. |SOFTWARE_NAME| replace:: C2Beacon
.. |SOFTWARE_NAME_BACKTICK| replace:: ``C2Beacon``
``c2_server_ip_address``
"""""""""""""""""""""""
IP address of the ``C2Server`` that the C2 Beacon will use to establish connection.
This must be a valid octet i.e. in the range of ``0.0.0.0`` and ``255.255.255.255``.
``Keep Alive Frequency``
"""""""""""""""""""""""
How often should the C2 Beacon confirm it's connection in timesteps.
For example, if the keep alive Frequency is set to one then every single timestep
the C2 connection will be confirmed.
It's worth noting that this may be useful option when investigating
network blue agent observation space.
This must be a valid integer i.e ``10``. Defaults to ``5``.
``Masquerade Protocol``
"""""""""""""""""""""""
The protocol that the C2 Beacon will use to communicate to the C2 Server with.
Currently only ``tcp`` and ``udp`` are valid masquerade protocol options.
It's worth noting that this may be useful option to bypass ACL rules.
This must be a string i.e ``udp``. Defaults to ``tcp``.
_Please refer to the ``IPProtocol`` class for further reference._
``Masquerade Port``
"""""""""""""""""""
What port that the C2 Beacon will use to communicate to the C2 Server with.
Currently only ``FTP``, ``HTTP`` and ``DNS`` are valid masquerade port options.
It's worth noting that this may be useful option to bypass ACL rules.
This must be a string i.e ``DNS``. Defaults to ``HTTP``.
_Please refer to the ``IPProtocol`` class for further reference._
_The C2 Server does not currently offer any unique configuration options and will configure itself to match the C2 beacon's network behaviour._

View File

@@ -633,7 +633,7 @@
"ransomware_install_command = {\"commands\":[[\"software_manager\", \"application\", \"install\", \"RansomwareScript\"]],\n",
" \"username\": \"admin\",\n",
" \"password\": \"admin\"}\n",
"c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)\n"
"c2_server.send_command(C2Command.TERMINAL, command_options=ransomware_install_command)\n"
]
},
{
@@ -644,7 +644,7 @@
"source": [
"# Configuring the RansomwareScript\n",
"ransomware_config = {\"server_ip_address\": \"192.168.1.14\", \"payload\": \"ENCRYPT\"}\n",
"c2_server._send_command(C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)"
"c2_server.send_command(C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)"
]
},
{
@@ -681,7 +681,7 @@
"source": [
"# Waiting for the ransomware to finish installing and then launching the RansomwareScript.\n",
"blue_env.step(0)\n",
"c2_server._send_command(C2Command.RANSOMWARE_LAUNCH, command_options={})"
"c2_server.send_command(C2Command.RANSOMWARE_LAUNCH, command_options={})"
]
},
{
@@ -834,7 +834,7 @@
" \"password\": \"admin\"}\n",
"\n",
"c2_server: C2Server = client_1.software_manager.software[\"C2Server\"]\n",
"c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)"
"c2_server.send_command(C2Command.TERMINAL, command_options=ransomware_install_command)"
]
},
{
@@ -922,7 +922,7 @@
" \"password\": \"admin\"}\n",
"\n",
"c2_server: C2Server = client_1.software_manager.software[\"C2Server\"]\n",
"c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)"
"c2_server.send_command(C2Command.TERMINAL, command_options=ransomware_install_command)"
]
},
{
@@ -1007,8 +1007,8 @@
"blue_env.step(0)\n",
"\n",
"# Attempting to install and execute the ransomware script\n",
"c2_server._send_command(C2Command.TERMINAL, command_options=ransomware_install_command)\n",
"c2_server._send_command(C2Command.RANSOMWARE_LAUNCH, command_options={})"
"c2_server.send_command(C2Command.TERMINAL, command_options=ransomware_install_command)\n",
"c2_server.send_command(C2Command.RANSOMWARE_LAUNCH, command_options={})"
]
},
{

View File

@@ -6,6 +6,7 @@ from typing import Dict, Optional
from pydantic import BaseModel, Field, validate_call
from primaite.interface.request import RequestResponse
from primaite.simulator.network.protocols.masquerade import C2Packet
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@@ -53,6 +54,8 @@ class AbstractC2(Application, identifier="AbstractC2"):
as well as providing the abstract methods for sending, receiving and parsing commands.
Defaults to masquerading as HTTP (Port 80) via TCP.
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
"""
c2_connection_active: bool = False
@@ -79,11 +82,46 @@ class AbstractC2(Application, identifier="AbstractC2"):
masquerade_port: Port = Field(default=Port.HTTP)
"""The currently chosen port that the C2 traffic is masquerading as. Defaults at HTTP."""
# The c2 beacon sets the c2_config through it's own internal method - configure (which is also used by agents)
# and then passes the config attributes to the c2 server via keep alives
# The c2 server parses the C2 configurations from keep alive traffic and sets the c2_config accordingly.
def _craft_packet(
self, c2_payload: C2Payload, c2_command: Optional[C2Command] = None, command_options: Optional[Dict] = {}
) -> C2Packet:
"""
Creates and returns a Masquerade Packet using the parameters given.
The packet uses the current c2 configuration and parameters given
to construct a C2 Packet.
:param c2_payload: The type of C2 Traffic ot be sent
:type c2_payload: C2Payload
:param c2_command: The C2 command to be sent to the C2 Beacon.
:type c2_command: C2Command.
:param command_options: The relevant C2 Beacon parameters.F
:type command_options: Dict
:return: Returns the construct C2Packet
:rtype: C2Packet
"""
constructed_packet = C2Packet(
masquerade_protocol=self.c2_config.masquerade_protocol,
masquerade_port=self.c2_config.masquerade_port,
keep_alive_frequency=self.c2_config.keep_alive_frequency,
payload_type=c2_payload,
command=c2_command,
payload=command_options,
)
return constructed_packet
c2_config: _C2_Opts = _C2_Opts()
"""Holds the current configuration settings of the C2 Suite."""
"""
Holds the current configuration settings of the C2 Suite.
The C2 beacon initialise this class through it's internal configure method.
The C2 Server when receiving a keep alive will initialise it's own configuration
to match that of the configuration settings passed in the keep alive through _resolve keep alive.
If the C2 Beacon is reconfigured then a new keep alive is set which causes the
C2 beacon to reconfigure it's configuration settings.
"""
def describe_state(self) -> Dict:
"""
@@ -187,27 +225,18 @@ class AbstractC2(Application, identifier="AbstractC2"):
:returns: Returns True if a send alive was successfully sent. False otherwise.
:rtype bool:
"""
# Checking that the c2 application is capable of performing both actions and has an enabled NIC
# (Using NOT to improve code readability)
if self.c2_remote_connection is None:
self.sys_log.error(
f"{self.name}: Unable to establish connection as the C2 Server's IP Address has not been configured."
# Checking that the c2 application is capable of connecting to remote.
# Purely a safety guard clause.
if not (connection_status := self._check_connection()[0]):
self.sys_log.warning(
f"{self.name}: Unable to send keep alive due to c2 connection status: {connection_status}."
)
if not self._can_perform_network_action():
self.sys_log.warning(f"{self.name}: Unable to perform network actions. Unable to send Keep Alive.")
return False
# We also Pass masquerade proto`col/port so that the c2 server can reply on the correct protocol/port.
# (This also lays the foundations for switching masquerade port/protocols mid episode.)
keep_alive_packet = C2Packet(
masquerade_protocol=self.c2_config.masquerade_protocol,
masquerade_port=self.c2_config.masquerade_port,
keep_alive_frequency=self.c2_config.keep_alive_frequency,
payload_type=C2Payload.KEEP_ALIVE,
command=None,
)
# C2 Server will need to configure c2_remote_connection after it receives it's first keep alive.
# Passing our current C2 configuration in remain in sync.
keep_alive_packet = self._craft_packet(c2_payload=C2Payload.KEEP_ALIVE)
# Sending the keep alive via the .send() method (as with all other applications.)
if self.send(
payload=keep_alive_packet,
dest_ip_address=self.c2_remote_connection,
@@ -215,6 +244,8 @@ class AbstractC2(Application, identifier="AbstractC2"):
ip_protocol=self.c2_config.masquerade_protocol,
session_id=session_id,
):
# Setting the keep_alive_sent guard condition to True. This is used to prevent packet storms.
# This prevents the _resolve_keep_alive method from calling this method again (until the next timestep.)
self.keep_alive_sent = True
self.sys_log.info(f"{self.name}: Keep Alive sent to {self.c2_remote_connection}")
self.sys_log.debug(
@@ -266,7 +297,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
f"Keep Alive Frequency: {self.c2_config.keep_alive_frequency}"
)
# This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon)
# This statement is intended to catch on the C2 Application that is listening for connection.
if self.c2_remote_connection is None:
self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.")
self.c2_remote_connection = IPv4Address(self.c2_session.with_ip_address)
@@ -287,8 +318,14 @@ class AbstractC2(Application, identifier="AbstractC2"):
self.c2_config.masquerade_protocol = IPProtocol.TCP
@abstractmethod
def _confirm_connection(self, timestep: int) -> bool:
"""Abstract method - Checks the suitability of the current C2 Server/Beacon connection."""
def _confirm_remote_connection(self, timestep: int) -> bool:
"""
Abstract method - Confirms the suitability of the current C2 application remote connection.
Each application will have perform different behaviour to confirm the remote connection.
:return: Boolean. True if remote connection is confirmed, false otherwise.
"""
def apply_timestep(self, timestep: int) -> None:
"""Apply a timestep to the c2_server & c2 beacon.
@@ -316,5 +353,39 @@ class AbstractC2(Application, identifier="AbstractC2"):
and self.health_state_actual is SoftwareHealthState.GOOD
):
self.keep_alive_inactivity += 1
self._confirm_connection(timestep)
self._confirm_remote_connection(timestep)
return
def _check_connection(self) -> tuple[bool, RequestResponse]:
"""
Validation method: Checks that the C2 application is capable of sending C2 Command input/output.
Performs a series of connection validation to ensure that the C2 application is capable of
sending and responding to the remote c2 connection.
:return: A tuple containing a boolean True/False and a corresponding Request Response
:rtype: tuple[bool, RequestResponse]
"""
if self._can_perform_network_action == False:
self.sys_log.warning(f"{self.name}: Unable to make leverage networking resources. Rejecting Command.")
return [
False,
RequestResponse(
status="failure", data={"Reason": "Unable to access networking resources. Unable to send command."}
),
]
if self.c2_remote_connection is False:
self.sys_log.warning(f"{self.name}: C2 Application has yet to establish connection. Rejecting command.")
return [
False,
RequestResponse(
status="failure",
data={"Reason": "C2 Application has yet to establish connection. Unable to send command."},
),
]
else:
return [
True,
RequestResponse(status="success", data={"Reason": "C2 Application is able to send connections."}),
]

View File

@@ -28,12 +28,15 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
to simulate malicious communications and infrastructure within primAITE.
Must be configured with the C2 Server's IP Address upon installation.
Please refer to the _configure method for further information.
Extends the Abstract C2 application to include the following:
1. Receiving commands from the C2 Server (Command input)
2. Leveraging the terminal application to execute requests (dependant on the command given)
3. Sending the RequestResponse back to the C2 Server (Command output)
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
"""
keep_alive_attempted: bool = False
@@ -141,9 +144,18 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
masquerade_port: Enum = Port.HTTP,
) -> bool:
"""
Configures the C2 beacon to communicate with the C2 server with following additional parameters.
Configures the C2 beacon to communicate with the C2 server.
The C2 Beacon has four different configuration options which can be used to
modify the networking behaviour between the C2 Server and the C2 Beacon.
Configuration Option | Option Meaning
---------------------|------------------------
c2_server_ip_address | The IP Address of the C2 Server. (The C2 Server must be running)
keep_alive_frequency | How often should the C2 Beacon confirm it's connection in timesteps.
masquerade_protocol | What protocol should the C2 traffic masquerade as? (HTTP, FTP or DNS)
masquerade_port | What port should the C2 traffic use? (TCP or UDP)
# TODO: Expand docustring.
:param c2_server_ip_address: The IP Address of the C2 Server. Used to establish connection.
:type c2_server_ip_address: IPv4Address
@@ -245,13 +257,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
:param session_id: The current session established with the C2 Server.
:type session_id: Str
"""
output_packet = C2Packet(
masquerade_protocol=self.c2_config.masquerade_protocol,
masquerade_port=self.c2_config.masquerade_port,
keep_alive_frequency=self.c2_config.keep_alive_frequency,
payload_type=C2Payload.OUTPUT,
payload=command_output,
)
output_packet = self._craft_packet(c2_payload=C2Payload.OUTPUT, command_options=command_output)
if self.send(
payload=output_packet,
dest_ip_address=self.c2_remote_connection,
@@ -410,7 +416,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
# If this method returns true then we have sent successfully sent a keep alive.
return self._send_keep_alive(session_id)
def _confirm_connection(self, timestep: int) -> bool:
def _confirm_remote_connection(self, timestep: int) -> bool:
"""Checks the suitability of the current C2 Server connection.
If a connection cannot be confirmed then this method will return false otherwise true.

View File

@@ -24,6 +24,8 @@ class C2Server(AbstractC2, identifier="C2Server"):
1. Sending commands to the C2 Beacon. (Command input)
2. Parsing terminal RequestResponses back to the Agent.
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
"""
current_command_output: RequestResponse = None
@@ -51,7 +53,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
"server_ip_address": request[-1].get("server_ip_address"),
"payload": request[-1].get("payload"),
}
return self._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
return self.send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=ransomware_config)
def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters.
@@ -63,7 +65,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
:return: RequestResponse object with a success code reflecting whether the ransomware was launched.
:rtype: RequestResponse
"""
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options={})
return self.send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options={})
def _remote_terminal_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Agent Action - Sends a TERMINAL C2Command to the C2 Beacon with the given parameters.
@@ -76,7 +78,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
:rtype: RequestResponse
"""
command_payload = request[-1]
return self._send_command(given_command=C2Command.TERMINAL, command_options=command_payload)
return self.send_command(given_command=C2Command.TERMINAL, command_options=command_payload)
rm.add_request(
name="ransomware_configure",
@@ -159,7 +161,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
return self._send_keep_alive(session_id)
@validate_call
def _send_command(self, given_command: C2Command, command_options: Dict) -> RequestResponse:
def send_command(self, given_command: C2Command, command_options: Dict) -> RequestResponse:
"""
Sends a command to the C2 Beacon.
@@ -193,26 +195,17 @@ class C2Server(AbstractC2, identifier="C2Server"):
status="failure", data={"Reason": "Received unexpected C2Command. Unable to send command."}
)
if self._can_perform_network_action == False:
self.sys_log.warning(f"{self.name}: Unable to make leverage networking resources. Rejecting Command.")
return RequestResponse(
status="failure", data={"Reason": "Unable to access networking resources. Unable to send command."}
)
if self.c2_remote_connection is False:
self.sys_log.warning(f"{self.name}: C2 Beacon has yet to establish connection. Rejecting command.")
return RequestResponse(
status="failure", data={"Reason": "C2 Beacon has yet to establish connection. Unable to send command."}
)
if self.c2_session is None:
self.sys_log.warning(f"{self.name}: C2 Beacon cannot be reached. Rejecting command.")
return RequestResponse(
status="failure", data={"Reason": "C2 Beacon cannot be reached. Unable to send command."}
)
# Lambda method used to return a failure RequestResponse if we're unable to confirm a connection.
# If _check_connection returns false then connection_status will return reason (A 'failure' Request Response)
if connection_status := (lambda return_bool, reason: reason if return_bool is False else None)(
*self._check_connection()
):
return connection_status
self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.")
command_packet = self._craft_packet(given_command=given_command, command_options=command_options)
command_packet = self._craft_packet(
c2_payload=C2Payload.INPUT, c2_command=given_command, command_options=command_options
)
if self.send(
payload=command_packet,
@@ -231,30 +224,6 @@ class C2Server(AbstractC2, identifier="C2Server"):
)
return self.current_command_output
# TODO: Probably could move this as a class method in C2Packet.
def _craft_packet(self, given_command: C2Command, command_options: Dict) -> C2Packet:
"""
Creates and returns a Masquerade Packet using the arguments given.
Creates Masquerade Packet with a payload_type INPUT C2Payload.
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
:param command_options: The relevant C2 Beacon parameters.F
:type command_options: Dict
:return: Returns the construct C2Packet
:rtype: C2Packet
"""
constructed_packet = C2Packet(
masquerade_protocol=self.c2_config.masquerade_protocol,
masquerade_port=self.c2_config.masquerade_port,
keep_alive_frequency=self.c2_config.keep_alive_frequency,
payload_type=C2Payload.INPUT,
command=given_command,
payload=command_options,
)
return constructed_packet
def show(self, markdown: bool = False):
"""
Prints a table of the current C2 attributes on a C2 Server.
@@ -292,7 +261,8 @@ class C2Server(AbstractC2, identifier="C2Server"):
)
print(table)
# Abstract method inherited from abstract C2 - Not currently utilised.
# Abstract method inherited from abstract C2.
# C2 Servers do not currently receive any input commands from the C2 beacon.
def _handle_command_input(self, payload: C2Packet) -> None:
"""Defining this method (Abstract method inherited from abstract C2) in order to instantiate the class.
@@ -304,13 +274,15 @@ class C2Server(AbstractC2, identifier="C2Server"):
self.sys_log.warning(f"{self.name}: C2 Server received an unexpected INPUT payload: {payload}")
pass
def _confirm_connection(self, timestep: int) -> bool:
def _confirm_remote_connection(self, timestep: int) -> bool:
"""Checks the suitability of the current C2 Beacon connection.
If a C2 Server has not received a keep alive within the current set
keep alive frequency (self._keep_alive_frequency) then the C2 beacons
connection is considered dead and any commands will be rejected.
This method is used to
:param timestep: The current timestep of the simulation.
:type timestep: Int
:return: Returns False if the C2 beacon is considered dead. Otherwise True.

View File

@@ -263,7 +263,7 @@ def test_c2_suite_terminal_command_file_creation(basic_network):
"ip_address": None,
}
c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command)
c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
assert computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
assert c2_beacon.local_terminal_session is not None
@@ -273,7 +273,7 @@ def test_c2_suite_terminal_command_file_creation(basic_network):
# node_c's IP is 192.168.255.3
file_create_command.update({"ip_address": "192.168.255.3"})
c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command)
c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
assert computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
assert c2_beacon.remote_terminal_session is not None
@@ -369,7 +369,7 @@ def test_c2_suite_acl_bypass(basic_network):
"password": "admin",
"ip_address": None,
}
c2_server._send_command(C2Command.TERMINAL, command_options=ftp_file_create_command)
c2_server.send_command(C2Command.TERMINAL, command_options=ftp_file_create_command)
assert (
computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="ftp_test_file")
== True
@@ -440,7 +440,7 @@ def test_c2_suite_acl_bypass(basic_network):
"password": "admin",
"ip_address": None,
}
c2_server._send_command(C2Command.TERMINAL, command_options=http_file_create_command)
c2_server.send_command(C2Command.TERMINAL, command_options=http_file_create_command)
assert (
computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="http_test_file")
== True

View File

@@ -102,7 +102,7 @@ def test_c2_handle_beacon_disconnect(basic_c2_network):
"ip_address": None,
}
command_request_response = c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command)
command_request_response = c2_server.send_command(C2Command.TERMINAL, command_options=file_create_command)
assert command_request_response.status == "failure"
@@ -117,9 +117,6 @@ def test_c2_handle_beacon_disconnect(basic_c2_network):
assert c2_server.c2_connection_active is False
# TODO: Finalise and complete these tests.
def test_c2_handle_switching_port(basic_c2_network):
"""Tests that the C2 suite will be able handle switching destination/src port."""
network: Network = basic_c2_network
@@ -205,3 +202,30 @@ def test_c2_handle_switching_frequency(basic_c2_network):
assert c2_beacon.keep_alive_inactivity is 0
assert c2_server.keep_alive_inactivity is 0
def test_c2_handles_1_timestep_keep_alive(basic_c2_network):
"""Tests that the C2 suite will be able handle a C2 Beacon will a keep alive of 1 timestep."""
network: Network = basic_c2_network
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
c2_beacon.configure(c2_server_ip_address="192.168.0.1", keep_alive_frequency=1)
c2_server.run()
c2_beacon.establish()
for i in range(50):
network.apply_timestep(i)
assert c2_beacon.c2_connection_active is True
assert c2_server.c2_connection_active is True
def test_c2_server_runs_on_default(basic_c2_network):
"""Tests that the C2 Server begins running by default."""
network: Network = basic_c2_network
computer_a: Computer = network.get_node_by_hostname("computer_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
assert c2_server.operating_state == ApplicationOperatingState.RUNNING