#2689 Updated docustrings and general quality improvements.

This commit is contained in:
Archer.Bowen
2024-08-02 16:13:59 +01:00
parent 2339dabac1
commit 1933522e89
3 changed files with 88 additions and 42 deletions

View File

@@ -214,3 +214,21 @@ class Application(IOSoftware):
f"Cannot perform request on application '{self.application.name}' because it is not in the "
f"{self.state.name} state."
)
def _can_perform_network_action(self) -> bool:
"""
Checks if the application can perform outbound network actions.
First confirms application suitability via the can_perform_action method.
Then confirms that the host has an enabled NIC that can be used for outbound traffic.
:return: True if outbound network actions can be performed, otherwise False.
:rtype bool:
"""
if not super()._can_perform_action():
return False
for nic in self.software_manager.node.network_interface.values():
if nic.enabled:
return True
return False

View File

@@ -13,7 +13,6 @@ from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.session_manager import Session
# TODO:
# Complete C2 Server and C2 Beacon TODOs
# Create test that leverage all the functionality needed for the different TAPs
# Create a .RST doc
# Potentially? A notebook which demonstrates a custom red agent using the c2 server for various means.
@@ -79,24 +78,6 @@ class AbstractC2(Application, identifier="AbstractC2"):
current_c2_session: Session = None
"""The currently active session that the C2 Traffic is using. Set after establishing connection."""
# TODO: Move this duplicate method from NMAP class into 'Application' to adhere to DRY principle.
def _can_perform_network_action(self) -> bool:
"""
Checks if the C2 application can perform outbound network actions.
This is done by checking the parent application can_per_action functionality.
Then checking if there is an enabled NIC that can be used for outbound traffic.
:return: True if outbound network actions can be performed, otherwise False.
"""
if not super()._can_perform_action():
return False
for nic in self.software_manager.node.network_interface.values():
if nic.enabled:
return True
return False
def describe_state(self) -> Dict:
"""
Describe the state of the C2 application.
@@ -106,9 +87,11 @@ class AbstractC2(Application, identifier="AbstractC2"):
"""
return super().describe_state()
# TODO: Update this post application/services requiring to listen to multiple ports
def __init__(self, **kwargs):
"""Initialise the C2 applications to by default listen for HTTP traffic."""
kwargs["port"] = Port.HTTP # TODO: Update this post application/services requiring to listen to multiple ports
kwargs["protocol"] = IPProtocol.TCP # Update this as well
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
# Validate call ensures we are only handling Masquerade Packets.
@@ -173,15 +156,30 @@ class AbstractC2(Application, identifier="AbstractC2"):
# from_network_interface=from_network_interface
def receive(self, payload: MasqueradePacket, session_id: Optional[str] = None, **kwargs) -> bool:
"""Receives masquerade packets. Used by both c2 server and c2 client.
"""Receives masquerade packets. Used by both c2 server and c2 beacon.
Defining the `Receive` method so that the application can receive packets via the session manager.
These packets are then immediately handed to ._handle_c2_payload.
:param payload: The Masquerade Packet to be received.
:param session: The transport session that the payload is originating from.
:type payload: MasqueradePacket
:param session_id: The transport session_id that the payload is originating from.
:type session_id: str
"""
return self._handle_c2_payload(payload, session_id)
def _send_keep_alive(self, session_id: Optional[str]) -> bool:
"""Sends a C2 keep alive payload to the self.remote_connection IPv4 Address."""
"""Sends a C2 keep alive payload to the self.remote_connection IPv4 Address.
Used by both the c2 client and the s2 server for establishing and confirming connection.
This method also contains some additional validation to ensure that the C2 applications
are correctly configured before sending any traffic.
:param session_id: The transport session_id that the payload is originating from.
:type session_id: str
:returns: Returns True if a send alive was successfully sent. False otherwise.
:rtype bool:
"""
# Checking that the c2 application is capable of performing both actions and has an enabled NIC
# (Using NOT to improve code readability)
if self.c2_remote_connection is None:
@@ -230,6 +228,8 @@ class AbstractC2(Application, identifier="AbstractC2"):
:param payload: The Keep Alive payload received.
:type payload: MasqueradePacket
:param session_id: The transport session_id that the payload is originating from.
:type session_id: str
:return: True on successful configuration, false otherwise.
:rtype: bool
"""
@@ -240,8 +240,9 @@ class AbstractC2(Application, identifier="AbstractC2"):
f"Port: {payload.masquerade_port} Protocol: {payload.masquerade_protocol}."
)
return False
# TODO: Validation on Ports (E.g only allow HTTP, FTP etc)
# Potentially compare to IPProtocol & Port children (Same way that abstract TAP does it with kill chains)
# Potentially compare to IPProtocol & Port children? Depends on how listening on multiple ports is implemented.
# Setting the Ports
self.current_masquerade_port = payload.masquerade_port
@@ -253,6 +254,6 @@ class AbstractC2(Application, identifier="AbstractC2"):
self.c2_remote_connection = self.current_c2_session.with_ip_address
self.c2_connection_active = True # Sets the connection to active
self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zeroW
self.keep_alive_inactivity = 0 # Sets the keep alive inactivity to zero
return True

View File

@@ -137,6 +137,10 @@ class C2Server(AbstractC2, identifier="C2 Server"):
Returns False if a keep alive was unable to be sent.
Returns True if a keep alive was successfully sent or already has been sent this timestep.
:param payload: The Keep Alive payload received.
:type payload: MasqueradePacket
:param session_id: The transport session_id that the payload is originating from.
:type session_id: str
:return: True if successfully handled, false otherwise.
:rtype: Bool
"""
@@ -160,7 +164,22 @@ class C2Server(AbstractC2, identifier="C2 Server"):
"""
Sends a command to the C2 Beacon.
# TODO: Expand this docustring.
Currently, these commands leverage the pre-existing capability of other applications.
However, the commands are sent via the network rather than the game layer which
grants more opportunity to the blue agent to prevent attacks.
Additionally, future editions of primAITE may expand the C2 repertoire to allow for
more complex red agent behaviour such as file extraction, establishing further fall back channels
or introduce red applications that are only installable via C2 Servers. (T1105)
C2 Command | Meaning
---------------------|------------------------
RANSOMWARE_CONFIGURE | Configures an installed ransomware script based on the passed parameters.
RANSOMWARE_LAUNCH | Launches the installed ransomware script.
Terminal | Executes a command via the terminal installed on the C2 Beacons Host.
For more information on the impact of these commands please refer to the terminal
and the ransomware applications.
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
@@ -198,11 +217,12 @@ class C2Server(AbstractC2, identifier="C2 Server"):
# If the command output was handled currently, the self.current_command_output will contain the RequestResponse.
return self.current_command_output
# TODO: Perhaps make a new pydantic base model for command_options?
# TODO: Perhaps make the return optional? Returns False is the packet was unable to be crafted.
# TODO: Probably could move this as a class method in MasqueradePacket.
def _craft_packet(self, given_command: C2Command, command_options: Dict) -> MasqueradePacket:
"""
Creates a Masquerade Packet based off the command parameter and the arguments given.
Creates and returns a Masquerade Packet using the arguments given.
Creates Masquerade Packet with a payload_type INPUT C2Payload
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
@@ -221,23 +241,18 @@ class C2Server(AbstractC2, identifier="C2 Server"):
)
return constructed_packet
# TODO: I think I can just overload the methods rather than setting it as abstract_method?
# Defining this abstract method
def _handle_command_input(self, payload: MasqueradePacket):
"""Defining this method (Abstract method inherited from abstract C2) in order to instantiate the class.
C2 Servers currently do not receive input commands coming from the C2 Beacons.
:param payload: The incoming MasqueradePacket
:type payload: MasqueradePacket.
"""
self.sys_log.warning(f"{self.name}: C2 Server received an unexpected INPUT payload: {payload}")
pass
def show(self, markdown: bool = False):
"""
Prints a table of the current C2 attributes on a C2 Server.
Displays the current values of the following C2 attributes:
``C2 Connection Active``:
If the C2 Server has established connection with a C2 Beacon.
``C2 Remote Connection``:
The IP of the C2 Beacon. (Configured by upon receiving a keep alive.)
:param markdown: If True, outputs the table in markdown format. Default is False.
"""
table = PrettyTable(["C2 Connection Active", "C2 Remote Connection"])
@@ -247,3 +262,15 @@ class C2Server(AbstractC2, identifier="C2 Server"):
table.title = f"{self.name} Running Status"
table.add_row([self.c2_connection_active, self.c2_remote_connection])
print(table)
# Abstract method inherited from abstract C2 - Not currently utilised.
def _handle_command_input(self, payload: MasqueradePacket) -> None:
"""Defining this method (Abstract method inherited from abstract C2) in order to instantiate the class.
C2 Servers currently do not receive input commands coming from the C2 Beacons.
:param payload: The incoming MasqueradePacket
:type payload: MasqueradePacket.
"""
self.sys_log.warning(f"{self.name}: C2 Server received an unexpected INPUT payload: {payload}")
pass