#2689 Overhauled .receive method. Keep Alive and initial implementation of commands working. (also Updated docustrings + pre-commit)

This commit is contained in:
Archer.Bowen
2024-08-02 13:25:08 +01:00
parent e554a2d224
commit 2339dabac1
6 changed files with 382 additions and 212 deletions

View File

@@ -36,8 +36,6 @@ from primaite.simulator.system.applications.red_applications.data_manipulation_b
)
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot # noqa: F401
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript # noqa: F401
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
from primaite.simulator.system.applications.web_browser import WebBrowser # noqa: F401
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient

View File

@@ -5,19 +5,6 @@ from typing import Optional
from primaite.simulator.network.protocols.packet import DataPacket
class C2Payload(Enum):
"""Represents the different types of command and control payloads."""
KEEP_ALIVE = "keep_alive"
"""C2 Keep Alive payload. Used by the C2 beacon and C2 Server to confirm their connection."""
INPUT = "input_command"
"""C2 Input Command payload. Used by the C2 Server to send a command to the c2 beacon."""
OUTPUT = "output_command"
"""C2 Output Command. Used by the C2 Beacon to send the results of a Input command to the c2 server."""
class MasqueradePacket(DataPacket):
"""Represents an generic malicious packet that is masquerading as another protocol."""
@@ -25,6 +12,6 @@ class MasqueradePacket(DataPacket):
masquerade_port: Enum # The 'Masquerade' port that is currently in use
payload_type: C2Payload # The type of C2 traffic (e.g keep alive, command or command out)
payload_type: Enum # The type of C2 traffic (e.g keep alive, command or command out)
command: Optional[str] # Used to pass the actual C2 Command in C2 INPUT
command: Optional[Enum] = None # Used to pass the actual C2 Command in C2 INPUT

View File

@@ -6,7 +6,7 @@ from typing import Dict, Optional
from pydantic import validate_call
from primaite.simulator.network.protocols.masquerade import C2Payload, MasqueradePacket
from primaite.simulator.network.protocols.masquerade import MasqueradePacket
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
@@ -20,9 +20,7 @@ from primaite.simulator.system.core.session_manager import Session
class C2Command(Enum):
"""
Enumerations representing the different commands the C2 suite currently supports.
"""
"""Enumerations representing the different commands the C2 suite currently supports."""
RANSOMWARE_CONFIGURE = "Ransomware Configure"
"Instructs the c2 beacon to configure the ransomware with the provided options."
@@ -36,12 +34,25 @@ class C2Command(Enum):
# The terminal command should also be able to pass a session which can be used for remote connections.
class C2Payload(Enum):
"""Represents the different types of command and control payloads."""
KEEP_ALIVE = "keep_alive"
"""C2 Keep Alive payload. Used by the C2 beacon and C2 Server to confirm their connection."""
INPUT = "input_command"
"""C2 Input Command payload. Used by the C2 Server to send a command to the c2 beacon."""
OUTPUT = "output_command"
"""C2 Output Command. Used by the C2 Beacon to send the results of a Input command to the c2 server."""
class AbstractC2(Application, identifier="AbstractC2"):
"""
An abstract command and control (c2) application.
Extends the application class to provide base functionality for c2 suite applications
such as c2 beacons and c2 servers.
such as c2 beacons and c2 servers.
Provides the base methods for handling ``Keep Alive`` connections, configuring masquerade ports and protocols
as well as providing the abstract methods for sending, receiving and parsing commands.
@@ -55,13 +66,6 @@ class AbstractC2(Application, identifier="AbstractC2"):
c2_remote_connection: IPv4Address = None
"""The IPv4 Address of the remote c2 connection. (Either the IP of the beacon or the server)."""
keep_alive_sent: bool = False
"""Indicates if a keep alive has been sent this timestep. Used to prevent packet storms."""
# We should set the application to NOT_RUNNING if the inactivity count reaches a certain thresh hold.
keep_alive_inactivity: int = 0
"""Indicates how many timesteps since the last time the c2 application received a keep alive."""
# These two attributes are set differently in the c2 server and c2 beacon.
# The c2 server parses the keep alive and sets these accordingly.
# The c2 beacon will set this attributes upon installation and configuration
@@ -75,9 +79,6 @@ class AbstractC2(Application, identifier="AbstractC2"):
current_c2_session: Session = None
"""The currently active session that the C2 Traffic is using. Set after establishing connection."""
# TODO: Create a attribute call 'LISTENER' which indicates whenever the c2 application should listen for incoming connections or establish connections.
# This in order to simulate a blind shell (the current implementation is more akin to a reverse shell)
# TODO: Move this duplicate method from NMAP class into 'Application' to adhere to DRY principle.
def _can_perform_network_action(self) -> bool:
"""
@@ -104,10 +105,10 @@ class AbstractC2(Application, identifier="AbstractC2"):
:rtype: Dict
"""
return super().describe_state()
def __init__(self, **kwargs):
kwargs["port"] = Port.HTTP # TODO: Update this post application/services requiring to listen to multiple ports
kwargs["protocol"] = IPProtocol.TCP # Update this as well
kwargs["port"] = Port.HTTP # TODO: Update this post application/services requiring to listen to multiple ports
kwargs["protocol"] = IPProtocol.TCP # Update this as well
super().__init__(**kwargs)
# Validate call ensures we are only handling Masquerade Packets.
@@ -145,7 +146,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
elif payload.payload_type == C2Payload.OUTPUT:
self.sys_log.info(f"{self.name} received an OUTPUT COMMAND payload.")
return self._handle_command_input(payload, session_id)
return self._handle_command_output(payload)
else:
self.sys_log.warning(
@@ -168,41 +169,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
pass
def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool:
"""
Handles receiving and sending keep alive payloads. This method is only called if we receive a keep alive.
Returns False if a keep alive was unable to be sent.
Returns True if a keep alive was successfully sent or already has been sent this timestep.
:return: True if successfully handled, false otherwise.
:rtype: Bool
"""
self.sys_log.info(f"{self.name}: Keep Alive Received from {self.c2_remote_connection}.")
self.c2_connection_active = True # Sets the connection to active
self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero
self.current_c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id]
# Using this guard clause to prevent packet storms and recognise that we've achieved a connection.
# This guard clause triggers on the c2 suite that establishes connection.
if self.keep_alive_sent == True:
# Return early without sending another keep alive and then setting keep alive_sent false for next timestep.
self.keep_alive_sent = False
return True
# If we've reached this part of the method then we've received a keep alive but haven't sent a reply.
# Therefore we also need to configure the masquerade attributes based off the keep alive sent.
if self._resolve_keep_alive(payload, session_id) == False:
self.sys_log.warning(f"{self.name}: Keep Alive Could not be resolved correctly. Refusing Keep Alive.")
return False
# If this method returns true then we have sent successfully sent a keep alive.
self.sys_log.info(f"{self.name}: Connection successfully established with {self.c2_remote_connection}.")
return self._send_keep_alive(session_id)
"""Abstract Method: The C2 Server and the C2 Beacon handle the KEEP ALIVEs differently."""
# from_network_interface=from_network_interface
def receive(self, payload: MasqueradePacket, session_id: Optional[str] = None, **kwargs) -> bool:
@@ -217,23 +184,23 @@ class AbstractC2(Application, identifier="AbstractC2"):
"""Sends a C2 keep alive payload to the self.remote_connection IPv4 Address."""
# Checking that the c2 application is capable of performing both actions and has an enabled NIC
# (Using NOT to improve code readability)
if self.c2_remote_connection == None:
self.sys_log.error(f"{self.name}: Unable to Establish connection as the C2 Server's IP Address has not been given.")
if self.c2_remote_connection is None:
self.sys_log.error(
f"{self.name}: Unable to Establish connection as the C2 Server's IP Address has not been given."
)
if not self._can_perform_network_action():
self.sys_log.warning(f"{self.name}: Unable to perform network actions.")
return False
# We also Pass masquerade protocol/port so that the c2 server can reply on the correct protocol/port.
# We also Pass masquerade proto`col/port so that the c2 server can reply on the correct protocol/port.
# (This also lays the foundations for switching masquerade port/protocols mid episode.)
keep_alive_packet = MasqueradePacket(
masquerade_protocol=self.current_masquerade_protocol,
masquerade_port=self.current_masquerade_port,
payload_type=C2Payload.KEEP_ALIVE,
command=None
command=None,
)
# We need to set this guard clause to true before sending the keep alive (prevents packet storms.)
self.keep_alive_sent = True
# C2 Server will need to configure c2_remote_connection after it receives it's first keep alive.
if self.send(
payload=keep_alive_packet,
@@ -242,16 +209,16 @@ class AbstractC2(Application, identifier="AbstractC2"):
ip_protocol=self.current_masquerade_protocol,
session_id=session_id,
):
self.keep_alive_sent = True
self.sys_log.info(f"{self.name}: Keep Alive sent to {self.c2_remote_connection}")
self.sys_log.debug(f"{self.name}: on {self.current_masquerade_port} via {self.current_masquerade_protocol}")
return True
else:
self.sys_log.warning(
f"{self.name}: failed to send a Keep Alive. The node may be unable to access the network."
f"{self.name}: failed to send a Keep Alive. The node may be unable to access the ``network."
)
return False
def _resolve_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool:
"""
Parses the Masquerade Port/Protocol within the received Keep Alive packet.
@@ -260,7 +227,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
Returns True on successfully extracting and configuring the masquerade port/protocols.
Returns False otherwise.
:param payload: The Keep Alive payload received.
:type payload: MasqueradePacket
:return: True on successful configuration, false otherwise.
@@ -268,22 +235,24 @@ class AbstractC2(Application, identifier="AbstractC2"):
"""
# Validating that they are valid Enums.
if not isinstance(payload.masquerade_port, Port) or not isinstance(payload.masquerade_protocol, IPProtocol):
self.sys_log.warning(f"{self.name}: Received invalid Masquerade type. Port: {type(payload.masquerade_port)} Protocol: {type(payload.masquerade_protocol)}.")
self.sys_log.warning(
f"{self.name}: Received invalid Masquerade Values within Keep Alive."
f"Port: {payload.masquerade_port} Protocol: {payload.masquerade_protocol}."
)
return False
# TODO: Validation on Ports (E.g only allow HTTP, FTP etc)
# Potentially compare to IPProtocol & Port children (Same way that abstract TAP does it with kill chains)
# Setting the Ports
self.current_masquerade_port = payload.masquerade_port
self.current_masquerade_protocol = payload.masquerade_protocol
# This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon)
if self.c2_remote_connection == None:
if self.c2_remote_connection is None:
self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.")
self.c2_remote_connection = self.current_c2_session.with_ip_address
self.c2_connection_active = True # Sets the connection to active
self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zeroW
return True

View File

@@ -1,34 +1,44 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command
#from primaite.simulator.system.services.terminal.terminal import Terminal
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.network.protocols.masquerade import C2Payload, MasqueradePacket
from primaite.simulator.network.transmission.network_layer import IPProtocol
from ipaddress import IPv4Address
from typing import Dict,Optional
from primaite.simulator.network.transmission.transport_layer import Port
from enum import Enum
from primaite.simulator.system.software import SoftwareHealthState
from ipaddress import IPv4Address
from typing import Dict, Optional
# from primaite.simulator.system.services.terminal.terminal import Terminal
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.protocols.masquerade import MasqueradePacket
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
from primaite.simulator.system.software import SoftwareHealthState
class C2Beacon(AbstractC2, identifier="C2 Beacon"):
"""
C2 Beacon Application.
Represents a generic C2 beacon which can be used in conjunction with the C2 Server
to simulate malicious communications within primAITE.
Represents a vendor generic C2 beacon is used in conjunction with the C2 Server
to simulate malicious communications and infrastructure within primAITE.
Must be configured with the C2 Server's IP Address upon installation.
Must be configured with the C2 Server's Ip Address upon installation.
Extends the Abstract C2 application to include the following:
1. Receiving commands from the C2 Server (Command input)
2. Leveraging the terminal application to execute requests (dependant on the command given)
3. Sending the RequestResponse back to the C2 Server (Command output)
"""
keep_alive_attempted: bool = False
"""Indicates if a keep alive has been attempted to be sent this timestep. Used to prevent packet storms."""
# We should set the application to NOT_RUNNING if the inactivity count reaches a certain thresh hold.
keep_alive_inactivity: int = 0
"""Indicates how many timesteps since the last time the c2 application received a keep alive."""
keep_alive_frequency: int = 5
"The frequency at which ``Keep Alive`` packets are sent to the C2 Server from the C2 Beacon."
@@ -38,8 +48,8 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
# Implement a command output method that sends the RequestResponse to the C2 server.
# Uncomment the terminal Import and the terminal property after terminal PR
#@property
#def _host_terminal(self) -> Terminal:
# @property
# def _host_terminal(self) -> Terminal:
# """Return the Terminal that is installed on the same machine as the C2 Beacon."""
# host_terminal: Terminal = self.software_manager.software.get("Terminal")
# if host_terminal: is None:
@@ -69,27 +79,33 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
:return: RequestResponse object with a success code reflecting whether the configuration could be applied.
:rtype: RequestResponse
"""
server_ip = request[-1].get("c2_server_ip_address")
if server_ip == None:
c2_remote_ip = request[-1].get("c2_server_ip_address")
if c2_remote_ip is None:
self.sys_log.error(f"{self.name}: Did not receive C2 Server IP in configuration parameters.")
RequestResponse(status="failure", data={"No C2 Server IP given to C2 beacon. Unable to configure C2 Beacon"})
RequestResponse(
status="failure", data={"No C2 Server IP given to C2 beacon. Unable to configure C2 Beacon"}
)
c2_remote_ip = IPv4Address(c2_remote_ip)
frequency = request[-1].get("keep_alive_frequency")
protocol= request[-1].get("masquerade_protocol")
protocol = request[-1].get("masquerade_protocol")
port = request[-1].get("masquerade_port")
return RequestResponse.from_bool(self.configure(c2_server_ip_address=server_ip,
keep_alive_frequency=frequency,
masquerade_protocol=protocol,
masquerade_port=port))
return RequestResponse.from_bool(
self.configure(
c2_server_ip_address=c2_remote_ip,
keep_alive_frequency=frequency,
masquerade_protocol=protocol,
masquerade_port=port,
)
)
rm.add_request("configure", request_type=RequestType(func=_configure))
return rm
def __init__(self, **kwargs):
kwargs["name"] = "C2Beacon"
super().__init__(**kwargs)
def configure(
self,
c2_server_ip_address: IPv4Address = None,
@@ -100,6 +116,7 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
"""
Configures the C2 beacon to communicate with the C2 server with following additional parameters.
# TODO: Expand docustring.
:param c2_server_ip_address: The IP Address of the C2 Server. Used to establish connection.
:type c2_server_ip_address: IPv4Address
@@ -117,43 +134,70 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
self.sys_log.info(
f"{self.name}: Configured {self.name} with remote C2 server connection: {c2_server_ip_address=}."
)
self.sys_log.debug(f"{self.name}: configured with the following settings:"
f"Remote C2 Server: {c2_server_ip_address}"
f"Keep Alive Frequency {keep_alive_frequency}"
f"Masquerade Protocol: {masquerade_protocol}"
f"Masquerade Port: {masquerade_port}")
self.sys_log.debug(
f"{self.name}: configured with the following settings:"
f"Remote C2 Server: {c2_server_ip_address}"
f"Keep Alive Frequency {keep_alive_frequency}"
f"Masquerade Protocol: {masquerade_protocol}"
f"Masquerade Port: {masquerade_port}"
)
return True
# I THINK that once the application is running it can respond to incoming traffic but I'll need to test this later.
def establish(self) -> bool:
"""Establishes connection to the C2 server via a send alive. Must be called after the C2 Beacon is configured."""
"""Establishes connection to the C2 server via a send alive. The C2 Beacon must already be configured."""
if self.c2_remote_connection is None:
self.sys_log.info(f"{self.name}: Failed to establish connection. C2 Beacon has not been configured.")
return False
self.run()
self.num_executions += 1
return self._send_keep_alive(session_id=None)
def _handle_command_input(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool:
"""
Handles the parsing of C2 Commands from C2 Traffic (Masquerade Packets)
as well as then calling the relevant method dependant on the C2 Command.
:param payload: The INPUT C2 Payload
Handles the parsing of C2 Commands from C2 Traffic (Masquerade Packets).
Dependant the C2 Command contained within the payload.
The following methods are called and returned.
C2 Command | Internal Method
---------------------|------------------------
RANSOMWARE_CONFIGURE | self._command_ransomware_config()
RANSOMWARE_LAUNCH | self._command_ransomware_launch()
Terminal | self._command_terminal()
Please see each method individually for further information regarding
the implementation of these commands.
:param payload: The INPUT C2 Payload
:type payload: MasqueradePacket
:return: The Request Response provided by the terminal execute method.
:rtype Request Response:
"""
command = payload.payload_type
if command != C2Payload:
# TODO: Probably could refactor this to be a more clean.
# The elif's are a bit ugly when they are all calling the same method.
command = payload.command
if not isinstance(command, C2Command):
self.sys_log.warning(f"{self.name}: Received unexpected C2 command. Unable to resolve command")
return self._return_command_output(RequestResponse(status="failure", data={"Received unexpected C2Command. Unable to resolve command."}))
return self._return_command_output(
command_output=RequestResponse(
status="failure",
data={"Reason": "C2 Beacon received unexpected C2Command. Unable to resolve command."},
),
session_id=session_id,
)
if command == C2Command.RANSOMWARE_CONFIGURE:
self.sys_log.info(f"{self.name}: Received a ransomware configuration C2 command.")
return self._return_command_output(command_output=self._command_ransomware_config(payload), session_id=session_id)
return self._return_command_output(
command_output=self._command_ransomware_config(payload), session_id=session_id
)
elif command == C2Command.RANSOMWARE_LAUNCH:
self.sys_log.info(f"{self.name}: Received a ransomware launch C2 command.")
return self._return_command_output(command_output=self._command_ransomware_launch(payload), session_id=session_id)
return self._return_command_output(
command_output=self._command_ransomware_launch(payload), session_id=session_id
)
elif command == C2Command.TERMINAL:
self.sys_log.info(f"{self.name}: Received a terminal C2 command.")
@@ -161,22 +205,30 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
else:
self.sys_log.error(f"{self.name}: Received an C2 command: {command} but was unable to resolve command.")
return self._return_command_output(RequestResponse(status="failure", data={"Unexpected Behaviour. Unable to resolve command."}))
return self._return_command_output(
RequestResponse(status="failure", data={"Reason": "Unexpected Behaviour. Unable to resolve command."})
)
def _return_command_output(self, command_output: RequestResponse, session_id: Optional[str] = None) -> bool:
"""Responsible for responding to the C2 Server with the output of the given command.
def _return_command_output(self, command_output: RequestResponse, session_id) -> bool:
"""Responsible for responding to the C2 Server with the output of the given command."""
:param command_output: The RequestResponse returned by the terminal application's execute method.
:type command_output: Request Response
:param session_id: The current session established with the C2 Server.
:type session_id: Str
"""
output_packet = MasqueradePacket(
masquerade_protocol=self.current_masquerade_protocol,
masquerade_port=self.current_masquerade_port,
payload_type=C2Payload.OUTPUT,
payload=command_output
payload=command_output,
)
if self.send(
payload=output_packet,
dest_ip_address=self.c2_remote_connection,
dest_port=self.current_masquerade_port,
ip_protocol=self.current_masquerade_protocol,
session_id=session_id,
):
self.sys_log.info(f"{self.name}: Command output sent to {self.c2_remote_connection}")
self.sys_log.debug(f"{self.name}: on {self.current_masquerade_port} via {self.current_masquerade_protocol}")
@@ -189,68 +241,121 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
def _command_ransomware_config(self, payload: MasqueradePacket) -> RequestResponse:
"""
C2 Command: Ransomware Configuration
C2 Command: Ransomware Configuration.
Creates a request that configures the ransomware based off the configuration options given.
This request is then sent to the terminal service in order to be executed.
:payload MasqueradePacket: The incoming INPUT command.
:type Masquerade Packet: MasqueradePacket.
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
"""
pass
#return self._host_terminal.execute(command)
# TODO: replace and use terminal
return RequestResponse(status="success", data={"Reason": "Placeholder."})
def _command_ransomware_launch(self, payload: MasqueradePacket) -> RequestResponse:
"""
C2 Command: Ransomware Execute
C2 Command: Ransomware Launch.
Creates a request that executes the ransomware script.
This request is then sent to the terminal service in order to be executed.
:payload MasqueradePacket: The incoming INPUT command.
:type Masquerade Packet: MasqueradePacket.
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
Creates a Request that launches the ransomware.
"""
pass
#return self._host_terminal.execute(command)
# TODO: replace and use terminal
return RequestResponse(status="success", data={"Reason": "Placeholder."})
def _command_terminal(self, payload: MasqueradePacket) -> RequestResponse:
"""
C2 Command: Ransomware Execute
C2 Command: Terminal.
Creates a request that executes the ransomware script.
Creates a request that executes a terminal command.
This request is then sent to the terminal service in order to be executed.
:payload MasqueradePacket: The incoming INPUT command.
:type Masquerade Packet: MasqueradePacket.
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
Creates a Request that launches the ransomware.
"""
pass
#return self._host_terminal.execute(command)
# TODO: uncomment and replace (uses terminal)
return RequestResponse(status="success", data={"Reason": "Placeholder."})
# return self._host_terminal.execute(command)
def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool:
"""
Handles receiving and sending keep alive payloads. This method is only called if we receive a keep alive.
In the C2 Beacon implementation of this method the c2 connection active boolean
is set to true and the keep alive inactivity is reset only after sending a keep alive
as wel as receiving a response back from the C2 Server.
This is because the C2 Server is the listener and thus will only ever receive packets from
the C2 Beacon rather than the other way around. (The C2 Beacon is akin to a reverse shell)
Therefore, we need a response back from the listener (C2 Server)
before the C2 beacon is able to confirm it's connection.
Returns False if a keep alive was unable to be sent.
Returns True if a keep alive was successfully sent or already has been sent this timestep.
:return: True if successfully handled, false otherwise.
:rtype: Bool
"""
self.sys_log.info(f"{self.name}: Keep Alive Received from {self.c2_remote_connection}.")
# Using this guard clause to prevent packet storms and recognise that we've achieved a connection.
# This guard clause triggers on the c2 suite that establishes connection.
if self.keep_alive_attempted is True:
self.c2_connection_active = True # Sets the connection to active
self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero
self.current_c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id]
# We set keep alive_attempted here to show that we've achieved connection.
self.keep_alive_attempted = False
self.sys_log.warning(f"{self.name}: Connection successfully Established with C2 Server.")
return True
# If we've reached this part of the method then we've received a keep alive but haven't sent a reply.
# Therefore we also need to configure the masquerade attributes based off the keep alive sent.
if self._resolve_keep_alive(payload, session_id) is False:
self.sys_log.warning(f"{self.name}: Keep Alive Could not be resolved correctly. Refusing Keep Alive.")
return False
self.keep_alive_attempted = True
# If this method returns true then we have sent successfully sent a keep alive.
return self._send_keep_alive(session_id)
# Not entirely sure if this actually works.
def apply_timestep(self, timestep: int) -> None:
"""
Apply a timestep to the c2_beacon.
Used to keep track of when the c2 beacon should send another keep alive.
"""Apply a timestep to the c2_beacon.
Used to keep track of when the c2 beacon should send another keep alive.
The following logic is applied:
1. Each timestep the keep_alive_inactivity is increased.
2. If the keep alive inactivity eclipses that of the keep alive frequency then another keep alive is sent.
3. If the c2 beacon receives a keep alive response packet then the ``keep_alive_inactivity`` attribute is set to 0
Therefore, if ``keep_alive_inactivity`` attribute is not 0, then the connection is considered severed and c2 beacon will shut down.
3. If a keep alive response packet is received then the ``keep_alive_inactivity`` attribute is reset.
Therefore, if ``keep_alive_inactivity`` attribute is not 0 after a keep alive is sent
then the connection is considered severed and c2 beacon will shut down.
:param timestep: The current timestep of the simulation.
:type timestep: Int
:return bool: Returns false if connection was lost. Returns True if connection is active or re-established.
:rtype bool:
"""
super().apply_timestep(timestep=timestep)
if self.operating_state is ApplicationOperatingState.RUNNING and self.health_state_actual is SoftwareHealthState.GOOD:
self.keep_alive_attempted = False # Resetting keep alive sent.
if (
self.operating_state is ApplicationOperatingState.RUNNING
and self.health_state_actual is SoftwareHealthState.GOOD
):
self.keep_alive_inactivity += 1
if not self._check_c2_connection(timestep):
self.sys_log.error(f"{self.name}: Connection Severed - Application Closing.")
@@ -259,34 +364,67 @@ class C2Beacon(AbstractC2, identifier="C2 Beacon"):
self.close()
return
def _check_c2_connection(self, timestep: int) -> bool:
"""Checks the suitability of the current C2 Server connection.
def _check_c2_connection(self, timestep) -> bool:
"""Checks the C2 Server connection. If a connection cannot be confirmed then this method will return false otherwise true."""
If a connection cannot be confirmed then this method will return false otherwise true.
:param timestep: The current timestep of the simulation.
:type timestep: Int
:return: Returns False if connection was lost. Returns True if connection is active or re-established.
:rtype bool:
"""
if self.keep_alive_inactivity == self.keep_alive_frequency:
self.sys_log.info(f"{self.name}: Attempting to Send Keep Alive to {self.c2_remote_connection} at timestep {timestep}.")
self.sys_log.info(
f"{self.name}: Attempting to Send Keep Alive to {self.c2_remote_connection} at timestep {timestep}."
)
self._send_keep_alive(session_id=self.current_c2_session.uuid)
if self.keep_alive_inactivity != 0:
self.sys_log.warning(f"{self.name}: Did not receive keep alive from c2 Server. Connection considered severed.")
self.sys_log.warning(
f"{self.name}: Did not receive keep alive from c2 Server. Connection considered severed."
)
return False
return True
# Defining this abstract method from Abstract C2
def _handle_command_output(self, payload):
"""C2 Beacons currently do not need to handle output commands coming from the C2 Servers."""
def _handle_command_output(self, payload: MasqueradePacket):
"""C2 Beacons currently does not need to handle output commands coming from the C2 Servers."""
self.sys_log.warning(f"{self.name}: C2 Beacon received an unexpected OUTPUT payload: {payload}.")
pass
def show(self, markdown: bool = False):
"""
Prints a table of the current C2 attributes on a C2 Beacon.
Prints a table of the current status of the C2 Beacon.
Displays the current values of the following C2 attributes:
``C2 Connection Active``:
If the C2 Beacon is currently connected to the C2 Server
``C2 Remote Connection``:
The IP of the C2 Server. (Configured by upon installation)
``Keep Alive Inactivity``:
How many timesteps have occurred since the last keep alive.
``Keep Alive Frequency``:
How often should the C2 Beacon attempt a keep alive?
:param markdown: If True, outputs the table in markdown format. Default is False.
"""
table = PrettyTable(["C2 Connection Active", "C2 Remote Connection", "Keep Alive Inactivity", "Keep Alive Frequency"])
table = PrettyTable(
["C2 Connection Active", "C2 Remote Connection", "Keep Alive Inactivity", "Keep Alive Frequency"]
)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.name} Running Status"
table.add_row([self.c2_connection_active, self.c2_remote_connection, self.keep_alive_inactivity, self.keep_alive_frequency])
table.add_row(
[
self.c2_connection_active,
self.c2_remote_connection,
self.keep_alive_inactivity,
self.keep_alive_frequency,
]
)
print(table)

View File

@@ -1,15 +1,34 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command
from primaite.simulator.network.protocols.masquerade import C2Payload, MasqueradePacket
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from typing import Dict,Optional
from pydantic import validate_call
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.protocols.masquerade import MasqueradePacket
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
class C2Server(AbstractC2, identifier="C2 Server"):
# TODO:
# Implement the request manager and agent actions.
# Implement the output handling methods. (These need to interface with the actions)
"""
C2 Server Application.
Represents a vendor generic C2 Server is used in conjunction with the C2 beacon
to simulate malicious communications and infrastructure within primAITE.
The C2 Server must be installed and be in a running state before it's able to receive
red agent actions and send commands to the C2 beacon.
Extends the Abstract C2 application to include the following:
1. Sending commands to the C2 Beacon. (Command input)
2. Parsing terminal RequestResponses back to the Agent.
"""
current_command_output: RequestResponse = None
"""The Request Response by the last command send. This attribute is updated by the method _handle_command_output."""
def _init_request_manager(self) -> RequestManager:
"""
Initialise the request manager.
@@ -34,7 +53,7 @@ class C2Server(AbstractC2, identifier="C2 Server"):
def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters.
:param request: Request with one element containing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
@@ -48,7 +67,7 @@ class C2Server(AbstractC2, identifier="C2 Server"):
def _remote_terminal_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Agent Action - Sends a TERMINAL C2Command to the C2 Beacon with the given parameters.
:param request: Request with one element containing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
@@ -78,28 +97,69 @@ class C2Server(AbstractC2, identifier="C2 Server"):
kwargs["name"] = "C2Server"
super().__init__(**kwargs)
def _handle_command_output(self, payload: MasqueradePacket) -> RequestResponse:
def _handle_command_output(self, payload: MasqueradePacket) -> bool:
"""
Handles the parsing of C2 Command Output from C2 Traffic (Masquerade Packets)
as well as then calling the relevant method dependant on the C2 Command.
:param payload: The OUTPUT C2 Payload
Handles the parsing of C2 Command Output from C2 Traffic (Masquerade Packets).
Parses the Request Response within MasqueradePacket's payload attribute (Inherited from Data packet).
The class attribute self.current_command_output is then set to this Request Response.
If the payload attribute does not contain a RequestResponse, then an error will be raised in syslog and
the self.current_command_output is updated to reflect the error.
:param payload: The OUTPUT C2 Payload
:type payload: MasqueradePacket
:return: Returns the Request Response of the C2 Beacon's host terminal service execute method.
:rtype Request Response:
:return: Returns True if the self.current_command_output is currently updated, false otherwise.
:rtype Bool:
"""
self.sys_log.info(f"{self.name}: Received command response from C2 Beacon: {payload}.")
command_output = payload.payload
if command_output != MasqueradePacket:
self.sys_log.warning(f"{self.name}: Received invalid command response: {command_output}.")
return RequestResponse(status="failure", data={"Received unexpected C2 Response."})
return command_output
if not isinstance(command_output, RequestResponse):
self.sys_log.warning(f"{self.name}: C2 Server received invalid command response: {command_output}.")
self.current_command_output = RequestResponse(
status="failure", data={"Reason": "Received unexpected C2 Response."}
)
return False
self.current_command_output = command_output
return True
def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool:
"""
Handles receiving and sending keep alive payloads. This method is only called if we receive a keep alive.
In the C2 Server implementation of this method the c2 connection active boolean
is set to true and the keep alive inactivity is reset after receiving one keep alive.
This is because the C2 Server is the listener and thus will only ever receive packets from
the C2 Beacon rather than the other way around. (The C2 Beacon is akin to a reverse shell)
Returns False if a keep alive was unable to be sent.
Returns True if a keep alive was successfully sent or already has been sent this timestep.
:return: True if successfully handled, false otherwise.
:rtype: Bool
"""
self.sys_log.info(f"{self.name}: Keep Alive Received. Attempting to resolve the remote connection details.")
self.c2_connection_active = True # Sets the connection to active
self.current_c2_session = self.software_manager.session_manager.sessions_by_uuid[session_id]
if self._resolve_keep_alive(payload, session_id) == False:
self.sys_log.warning(f"{self.name}: Keep Alive Could not be resolved correctly. Refusing Keep Alive.")
return False
# If this method returns true then we have sent successfully sent a keep alive.
self.sys_log.info(f"{self.name}: Remote connection successfully established: {self.c2_remote_connection}.")
self.sys_log.debug(f"{self.name}: Attempting to send Keep Alive response back to {self.c2_remote_connection}.")
return self._send_keep_alive(session_id)
@validate_call
def _send_command(self, given_command: C2Command, command_options: Dict) -> RequestResponse:
"""
Sends a command to the C2 Beacon.
# TODO: Expand this docustring.
:param given_command: The C2 command to be sent to the C2 Beacon.
@@ -109,31 +169,41 @@ class C2Server(AbstractC2, identifier="C2 Server"):
:return: Returns the Request Response of the C2 Beacon's host terminal service execute method.
:rtype: RequestResponse
"""
if given_command != C2Payload:
if not isinstance(given_command, C2Command):
self.sys_log.warning(f"{self.name}: Received unexpected C2 command. Unable to send command.")
return RequestResponse(status="failure", data={"Received unexpected C2Command. Unable to send command."})
return RequestResponse(
status="failure", data={"Reason": "Received unexpected C2Command. Unable to send command."}
)
if self._can_perform_network_action == False:
self.sys_log.warning(f"{self.name}: Unable to make leverage networking resources. Rejecting Command.")
return RequestResponse(
status="failure", data={"Reason": "Unable to access networking resources. Unable to send command."}
)
self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.")
command_packet = self._craft_packet(given_command=given_command, command_options=command_options)
# Need to investigate if this is correct.
if self.send(payload=command_packet,
if self.send(
payload=command_packet,
dest_ip_address=self.c2_remote_connection,
src_port=self.current_masquerade_port,
dst_port=self.current_masquerade_port,
session_id=self.current_c2_session.uuid,
dest_port=self.current_masquerade_port,
ip_protocol=self.current_masquerade_protocol,
session_id=None):
):
self.sys_log.info(f"{self.name}: Successfully sent {given_command}.")
self.sys_log.info(f"{self.name}: Awaiting command response {given_command}.")
return self._handle_command_output(command_packet)
# If the command output was handled currently, the self.current_command_output will contain the RequestResponse.
return self.current_command_output
# TODO: Perhaps make a new pydantic base model for command_options?
# TODO: Perhaps make the return optional? Returns False is the packet was unable to be crafted.
def _craft_packet(self, given_command: C2Command, command_options: Dict) -> MasqueradePacket:
"""
Creates a Masquerade Packet based off the command parameter and the arguments given.
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
:param command_options: The relevant C2 Beacon parameters.F
@@ -147,27 +217,33 @@ class C2Server(AbstractC2, identifier="C2 Server"):
masquerade_port=self.current_masquerade_port,
payload_type=C2Payload.INPUT,
command=given_command,
payload=command_options
payload=command_options,
)
return constructed_packet
# TODO: I think I can just overload the methods rather than setting it as abstract_method?
# Defining this abstract method
def _handle_command_input(self, payload):
"""C2 Servers currently do not receive input commands coming from the C2 Beacons."""
def _handle_command_input(self, payload: MasqueradePacket):
"""Defining this method (Abstract method inherited from abstract C2) in order to instantiate the class.
C2 Servers currently do not receive input commands coming from the C2 Beacons.
:param payload: The incoming MasqueradePacket
:type payload: MasqueradePacket.
"""
self.sys_log.warning(f"{self.name}: C2 Server received an unexpected INPUT payload: {payload}")
pass
def show(self, markdown: bool = False):
"""
Prints a table of the current C2 attributes on a C2 Server.
:param markdown: If True, outputs the table in markdown format. Default is False.
"""
table = PrettyTable(["C2 Connection Active", "C2 Remote Connection", "Keep Alive Inactivity"])
table = PrettyTable(["C2 Connection Active", "C2 Remote Connection"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.name} Running Status"
table.add_row([self.c2_connection_active, self.c2_remote_connection, self.keep_alive_inactivity])
table.add_row([self.c2_connection_active, self.c2_remote_connection])
print(table)

View File

@@ -12,14 +12,15 @@ from primaite.simulator.network.hardware.nodes.network.router import ACLAction,
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
# TODO: Update these tests.
@pytest.fixture(scope="function")
def c2_server_on_computer() -> Tuple[C2Beacon, Computer]:
computer: Computer = Computer(
@@ -30,6 +31,7 @@ def c2_server_on_computer() -> Tuple[C2Beacon, Computer]:
return [c2_beacon, computer]
@pytest.fixture(scope="function")
def c2_server_on_computer() -> Tuple[C2Server, Computer]:
computer: Computer = Computer(
@@ -41,7 +43,6 @@ def c2_server_on_computer() -> Tuple[C2Server, Computer]:
return [c2_server, computer]
@pytest.fixture(scope="function")
def basic_network() -> Network:
network = Network()
@@ -57,6 +58,7 @@ def basic_network() -> Network:
return network
def test_c2_suite_setup_receive(basic_network):
"""Test that C2 Beacon can successfully establish connection with the c2 Server."""
network: Network = basic_network
@@ -68,5 +70,5 @@ def test_c2_suite_setup_receive(basic_network):
c2_beacon.configure(c2_server_ip_address="192.168.0.10")
c2_beacon.establish()
c2_beacon.sys_log.show()
c2_beacon.sys_log.show()