#2689 added test template and fixed class instancing issues.
This commit is contained in:
@@ -35,7 +35,7 @@ class C2Command(Enum):
|
||||
# The terminal command should also be able to pass a session which can be used for remote connections.
|
||||
|
||||
|
||||
class AbstractC2(Application):
|
||||
class AbstractC2(Application, identifier="AbstractC2"):
|
||||
"""
|
||||
An abstract command and control (c2) application.
|
||||
|
||||
@@ -97,6 +97,11 @@ class AbstractC2(Application):
|
||||
:rtype: Dict
|
||||
"""
|
||||
return super().describe_state()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["port"] = Port.NONE
|
||||
kwargs["protocol"] = IPProtocol.NONE
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Validate call ensures we are only handling Masquerade Packets.
|
||||
@validate_call
|
||||
|
||||
@@ -12,7 +12,7 @@ from enum import Enum
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
|
||||
class C2Beacon(AbstractC2):
|
||||
class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
"""
|
||||
C2 Beacon Application.
|
||||
|
||||
@@ -86,8 +86,8 @@ class C2Beacon(AbstractC2):
|
||||
return rm
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.name = "C2Beacon"
|
||||
super.__init__(**kwargs)
|
||||
kwargs["name"] = "C2Beacon"
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def configure(
|
||||
self,
|
||||
@@ -268,4 +268,12 @@ class C2Beacon(AbstractC2):
|
||||
if self.keep_alive_inactivity != 0:
|
||||
self.sys_log.warning(f"{self.name}: Did not receive keep alive from c2 Server. Connection considered severed.")
|
||||
return False
|
||||
return True
|
||||
return True
|
||||
|
||||
|
||||
# Defining this abstract method from Abstract C2
|
||||
def _handle_command_output(self, payload):
|
||||
"""C2 Beacons currently do not need to handle output commands coming from the C2 Servers."""
|
||||
self.sys_log.warning(f"{self.name}: C2 Beacon received an unexpected OUTPUT payload: {payload}")
|
||||
pass
|
||||
|
||||
@@ -5,7 +5,7 @@ from primaite.simulator.core import RequestManager, RequestType
|
||||
from primaite.interface.request import RequestFormat, RequestResponse
|
||||
from typing import Dict,Optional
|
||||
|
||||
class C2Server(AbstractC2):
|
||||
class C2Server(AbstractC2, identifier="C2 Server"):
|
||||
# TODO:
|
||||
# Implement the request manager and agent actions.
|
||||
# Implement the output handling methods. (These need to interface with the actions)
|
||||
@@ -16,18 +16,6 @@ class C2Server(AbstractC2):
|
||||
More information in user guide and docstring for SimComponent._init_request_manager.
|
||||
"""
|
||||
rm = super()._init_request_manager()
|
||||
rm.add_request(
|
||||
name="c2_ransomware_configure",
|
||||
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(_configure_ransomware_action())),
|
||||
)
|
||||
rm.add_request(
|
||||
name="c2_ransomware_launch",
|
||||
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(_launch_ransomware_action())),
|
||||
)
|
||||
rm.add_request(
|
||||
name="c2_terminal_command",
|
||||
request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(_remote_terminal_action())),
|
||||
)
|
||||
|
||||
def _configure_ransomware_action(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
"""Requests - Sends a RANSOMWARE_CONFIGURE C2Command to the C2 Beacon with the given parameters.
|
||||
@@ -71,6 +59,23 @@ class C2Server(AbstractC2):
|
||||
placeholder: dict = {}
|
||||
return self._send_command(given_command=C2Command.RANSOMWARE_LAUNCH, command_options=placeholder)
|
||||
|
||||
rm.add_request(
|
||||
name="c2_ransomware_configure",
|
||||
request_type=RequestType(func=_configure_ransomware_action),
|
||||
)
|
||||
rm.add_request(
|
||||
name="c2_ransomware_launch",
|
||||
request_type=RequestType(func=_launch_ransomware_action),
|
||||
)
|
||||
rm.add_request(
|
||||
name="c2_terminal_command",
|
||||
request_type=RequestType(func=_remote_terminal_action),
|
||||
)
|
||||
return rm
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "C2Server"
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def _handle_command_output(self, payload: MasqueradePacket) -> RequestResponse:
|
||||
"""
|
||||
@@ -125,7 +130,7 @@ class C2Server(AbstractC2):
|
||||
|
||||
:param given_command: The C2 command to be sent to the C2 Beacon.
|
||||
:type given_command: C2Command.
|
||||
:param command_options: The relevant C2 Beacon parameters.
|
||||
:param command_options: The relevant C2 Beacon parameters.F
|
||||
:type command_options: Dict
|
||||
:return: Returns the construct MasqueradePacket
|
||||
:rtype: MasqueradePacket
|
||||
@@ -139,4 +144,9 @@ class C2Server(AbstractC2):
|
||||
payload=command_options
|
||||
)
|
||||
return constructed_packet
|
||||
|
||||
|
||||
# Defining this abstract method
|
||||
def _handle_command_input(self, payload):
|
||||
"""C2 Servers currently do not receive input commands coming from the C2 Beacons."""
|
||||
self.sys_log.warning(f"{self.name}: C2 Server received an unexpected INPUT payload: {payload}")
|
||||
pass
|
||||
|
||||
@@ -48,9 +48,19 @@ def basic_network() -> Network:
|
||||
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
|
||||
node_a.power_on()
|
||||
node_a.software_manager.get_open_ports()
|
||||
|
||||
|
||||
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
|
||||
node_b.power_on()
|
||||
network.connect(node_a.network_interface[1], node_b.network_interface[1])
|
||||
|
||||
return network
|
||||
return network
|
||||
|
||||
def test_c2_suite_setup_receive(basic_network):
|
||||
"""Test that C2 Beacon can successfully establish connection with the c2 Server"""
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
|
||||
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
c2_beacon: C2Server = computer_a.software_manager.software.get("C2Beacon")
|
||||
|
||||
Reference in New Issue
Block a user