#2689 Initial Implementation of C2 Server.

This commit is contained in:
Archer.Bowen
2024-07-31 16:43:17 +01:00
parent f097ed575d
commit 4c7e465f0d
5 changed files with 233 additions and 15 deletions

View File

@@ -36,6 +36,8 @@ from primaite.simulator.system.applications.red_applications.data_manipulation_b
)
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot # noqa: F401
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript # noqa: F401
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
from primaite.simulator.system.applications.web_browser import WebBrowser # noqa: F401
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_client import DNSClient

View File

@@ -100,7 +100,7 @@ class AbstractC2(Application):
# Validate call ensures we are only handling Masquerade Packets.
@validate_call
def _handle_c2_payload(self, payload: MasqueradePacket) -> bool:
def _handle_c2_payload(self, payload: MasqueradePacket, session_id: Optional[str] = None) -> bool:
"""Handles masquerade payloads for both c2 beacons and c2 servers.
Currently, the C2 application suite can handle the following payloads:
@@ -121,18 +121,19 @@ class AbstractC2(Application):
:param payload: The C2 Payload to be parsed and handled.
:return: True if the c2 payload was handled successfully, False otherwise.
:rtype: Bool
"""
if payload.payload_type == C2Payload.KEEP_ALIVE:
self.sys_log.info(f"{self.name} received a KEEP ALIVE!")
return self._handle_keep_alive(payload)
self.sys_log.info(f"{self.name} received a KEEP ALIVE payload.")
return self._handle_keep_alive(payload, session_id)
elif payload.payload_type == C2Payload.INPUT:
self.sys_log.info(f"{self.name} received an INPUT COMMAND!")
return self._handle_command_input(payload)
self.sys_log.info(f"{self.name} received an INPUT COMMAND payload.")
return self._handle_command_input(payload, session_id)
elif payload.payload_type == C2Payload.OUTPUT:
self.sys_log.info(f"{self.name} received an OUTPUT COMMAND!")
return self._handle_command_input(payload)
self.sys_log.info(f"{self.name} received an OUTPUT COMMAND payload.")
return self._handle_command_input(payload, session_id)
else:
self.sys_log.warning(
@@ -154,12 +155,15 @@ class AbstractC2(Application):
"""Abstract Method: Used in C2 beacon to parse and handle commands received from the c2 server."""
pass
def _handle_keep_alive(self) -> bool:
def _handle_keep_alive(self, payload: MasqueradePacket, session_id: Optional[str]) -> bool:
"""
Handles receiving and sending keep alive payloads. This method is only called if we receive a keep alive.
Returns False if a keep alive was unable to be sent.
Returns True if a keep alive was successfully sent or already has been sent this timestep.
:return: True if successfully handled, false otherwise.
:rtype: Bool
"""
self.sys_log.info(f"{self.name}: Keep Alive Received from {self.c2_remote_connection}")
# Using this guard clause to prevent packet storms and recognise that we've achieved a connection.
@@ -173,9 +177,12 @@ class AbstractC2(Application):
return True
# If we've reached this part of the method then we've received a keep alive but haven't sent a reply.
# Therefore we also need to configure the masquerade attributes based off the keep alive sent.
if not self._resolve_keep_alive(self, payload):
return False
# If this method returns true then we have sent successfully sent a keep alive.
if self._send_keep_alive(self):
if self._send_keep_alive(self, session_id):
self.keep_alive_sent = True # Setting the guard clause to true (prevents packet storms.)
return True
@@ -230,3 +237,28 @@ class AbstractC2(Application):
return False
def _resolve_keep_alive(self, payload: MasqueradePacket) -> bool:
"""
Parses the Masquerade Port/Protocol within the received Keep Alive packet.
Used to dynamically set the Masquerade Port and Protocol based on incoming traffic.
Returns True on successfully extracting and configuring the masquerade port/protocols.
Returns False otherwise.
:param payload: The Keep Alive payload received.
:type payload: MasqueradePacket
:return: True on successful configuration, false otherwise.
:rtype: bool
"""
# Validating that they are valid Enums.
if payload.masquerade_port or payload.masquerade_protocol != Enum:
self.sys_log.warning(f"{self.name}: Received invalid Masquerade type. Port: {type(payload.masquerade_port)} Protocol: {type(payload.masquerade_protocol)}")
return False
# TODO: Validation on Ports (E.g only allow HTTP, FTP etc)
# Potentially compare to IPProtocol & Port children (Same way that abstract TAP does it with kill chains)
# Setting the Ports
self.current_masquerade_port = payload.masquerade_port
self.current_masquerade_protocol = payload.masquerade_protocol
return True

View File

@@ -155,7 +155,7 @@ class C2Beacon(AbstractC2):
self.sys_log.info(f"{self.name}: Received a ransomware launch C2 command.")
return self._return_command_output(self._command_ransomware_launch(payload))
elif payload.payload_type == C2Command.TERMINAL:
elif command == C2Command.TERMINAL:
self.sys_log.info(f"{self.name}: Received a terminal C2 command.")
return self._return_command_output(self._command_terminal(payload))

View File

@@ -1,14 +1,142 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command
from primaite.simulator.network.protocols.masquerade import C2Payload, MasqueradePacket
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from typing import Dict,Optional
class C2Server(AbstractC2):
# TODO:
# Implement the request manager and agent actions.
# Implement the output handling methods. (These need to interface with the actions)
def _init_request_manager(self) -> RequestManager:
"""
Initialise the request manager.
def _handle_command_output(payload):
"""Abstract Method: Used in C2 server to parse and receive the output of commands sent to the c2 beacon."""
pass
More information in user guide and docstring for SimComponent._init_request_manager.
"""
rm = super()._init_request_manager()
rm.add_request(
name="c2_ransomware_configure",
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(_configure_ransomware_action())),
)
rm.add_request(
name="c2_ransomware_launch",
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(_launch_ransomware_action())),
)
rm.add_request(
name="c2_terminal_command",
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(_remote_terminal_action())),
)
def _configure_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Requests - Sends a RANSOMWARE_CONFIGURE C2Command to the C2 Beacon with the given parameters.
:param request: Request with one element containing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: RequestResponse object with a success code reflecting whether the configuration could be applied.
:rtype: RequestResponse
"""
# TODO: Parse the parameters from the request to get the parameters
placeholder: dict = {}
return self._send_command(given_command=C2Command.RANSOMWARE_CONFIGURE, command_options=placeholder)
def _launch_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Agent Action - Sends a RANSOMWARE_LAUNCH C2Command to the C2 Beacon with the given parameters.
:param request: Request with one element containing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: RequestResponse object with a success code reflecting whether the ransomware was launched.
:rtype: RequestResponse
"""
# TODO: Parse the parameters from the request to get the parameters
placeholder: dict = {}
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder)
def _remote_terminal_action(request: RequestFormat, context: Dict) -> RequestResponse:
"""Agent Action - Sends a TERMINAL C2Command to the C2 Beacon with the given parameters.
:param request: Request with one element containing a dict of parameters for the configure method.
:type request: RequestFormat
:param context: additional context for resolving this action, currently unused
:type context: dict
:return: RequestResponse object with a success code reflecting whether the ransomware was launched.
:rtype: RequestResponse
"""
# TODO: Parse the parameters from the request to get the parameters
placeholder: dict = {}
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder)
def _handle_command_output(self, payload: MasqueradePacket) -> RequestResponse:
"""
Handles the parsing of C2 Command Output from C2 Traffic (Masquerade Packets)
as well as then calling the relevant method dependant on the C2 Command.
:param payload: The OUTPUT C2 Payload
:type payload: MasqueradePacket
:return: Returns the Request Response of the C2 Beacon's host terminal service execute method.
:rtype Request Response:
"""
self.sys_log.info(f"{self.name}: Received command response from C2 Beacon: {payload}.")
command_output = payload.payload
if command_output != MasqueradePacket:
self.sys_log.warning(f"{self.name}: Received invalid command response: {command_output}.")
return RequestResponse(status="failure", data={"Received unexpected C2 Response."})
return command_output
def _send_command(self, given_command: C2Command, command_options: Dict) -> RequestResponse:
"""
Sends a command to the C2 Beacon.
# TODO: Expand this docustring.
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
:param command_options: The relevant C2 Beacon parameters.
:type command_options: Dict
:return: Returns the Request Response of the C2 Beacon's host terminal service execute method.
:rtype: RequestResponse
"""
if given_command != C2Payload:
self.sys_log.warning(f"{self.name}: Received unexpected C2 command. Unable to send command.")
return RequestResponse(status="failure", data={"Received unexpected C2Command. Unable to send command."})
self.sys_log.info(f"{self.name}: Attempting to send command {given_command}.")
command_packet = self._craft_packet(given_command=given_command, command_options=command_options)
# Need to investigate if this is correct.
if self.send(self, payload=command_packet,dest_ip_address=self.c2_remote_connection,
port=self.current_masquerade_port, protocol=self.current_masquerade_protocol,session_id=None):
self.sys_log.info(f"{self.name}: Successfully sent {given_command}.")
self.sys_log.info(f"{self.name}: Awaiting command response {given_command}.")
return self._handle_command_output(command_packet)
# TODO: Perhaps make a new pydantic base model for command_options?
# TODO: Perhaps make the return optional? Returns False is the packet was unable to be crafted.
def _craft_packet(self, given_command: C2Command, command_options: Dict) -> MasqueradePacket:
"""
Creates a Masquerade Packet based off the command parameter and the arguments given.
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
:param command_options: The relevant C2 Beacon parameters.
:type command_options: Dict
:return: Returns the construct MasqueradePacket
:rtype: MasqueradePacket
"""
# TODO: Validation on command_options.
constructed_packet = MasqueradePacket(
masquerade_protocol=self.current_masquerade_protocol,
masquerade_port=self.current_masquerade_port,
payload_type=C2Payload.INPUT,
command=given_command,
payload=command_options
)
return constructed_packet

View File

@@ -0,0 +1,56 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
# TODO: Update these tests.
@pytest.fixture(scope="function")
def c2_server_on_computer() -> Tuple[C2Beacon, Computer]:
computer: Computer = Computer(
hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0
)
computer.power_on()
c2_beacon = computer.software_manager.software.get("C2Beacon")
return [c2_beacon, computer]
@pytest.fixture(scope="function")
def c2_server_on_computer() -> Tuple[C2Server, Computer]:
computer: Computer = Computer(
hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0
)
computer.power_on()
c2_server = computer.software_manager.software.get("C2Server")
return [c2_server, computer]
@pytest.fixture(scope="function")
def basic_network() -> Network:
network = Network()
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
node_a.power_on()
node_a.software_manager.get_open_ports()
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
node_b.power_on()
network.connect(node_a.network_interface[1], node_b.network_interface[1])
return network