#2689 Updated c2 tests significantly and improved quality of debug logging.
This commit is contained in:
@@ -13,11 +13,6 @@ from primaite.simulator.system.applications.application import Application, Appl
|
||||
from primaite.simulator.system.core.session_manager import Session
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
# TODO:
|
||||
# Create test that leverage all the functionality needed for the different TAPs
|
||||
# Create a .RST doc
|
||||
# Potentially? A notebook which demonstrates a custom red agent using the c2 server for various means.
|
||||
|
||||
|
||||
class C2Command(Enum):
|
||||
"""Enumerations representing the different commands the C2 suite currently supports."""
|
||||
@@ -196,11 +191,11 @@ class AbstractC2(Application, identifier="AbstractC2"):
|
||||
# (Using NOT to improve code readability)
|
||||
if self.c2_remote_connection is None:
|
||||
self.sys_log.error(
|
||||
f"{self.name}: Unable to Establish connection as the C2 Server's IP Address has not been given."
|
||||
f"{self.name}: Unable to establish connection as the C2 Server's IP Address has not been configured."
|
||||
)
|
||||
|
||||
if not self._can_perform_network_action():
|
||||
self.sys_log.warning(f"{self.name}: Unable to perform network actions.")
|
||||
self.sys_log.warning(f"{self.name}: Unable to perform network actions. Unable to send Keep Alive.")
|
||||
return False
|
||||
|
||||
# We also Pass masquerade proto`col/port so that the c2 server can reply on the correct protocol/port.
|
||||
@@ -223,12 +218,14 @@ class AbstractC2(Application, identifier="AbstractC2"):
|
||||
self.keep_alive_sent = True
|
||||
self.sys_log.info(f"{self.name}: Keep Alive sent to {self.c2_remote_connection}")
|
||||
self.sys_log.debug(
|
||||
f"{self.name}: on {self.c2_config.masquerade_port} via {self.c2_config.masquerade_protocol}"
|
||||
f"{self.name}: Keep Alive sent to {self.c2_remote_connection}"
|
||||
f"Using Masquerade Port: {self.c2_config.masquerade_port}"
|
||||
f"Using Masquerade Protocol: {self.c2_config.masquerade_protocol}"
|
||||
)
|
||||
return True
|
||||
else:
|
||||
self.sys_log.warning(
|
||||
f"{self.name}: failed to send a Keep Alive. The node may be unable to access networking resources."
|
||||
f"{self.name}: Failed to send a Keep Alive. The node may be unable to access networking resources."
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -262,6 +259,13 @@ class AbstractC2(Application, identifier="AbstractC2"):
|
||||
self.c2_config.masquerade_protocol = payload.masquerade_protocol
|
||||
self.c2_config.keep_alive_frequency = payload.keep_alive_frequency
|
||||
|
||||
self.sys_log.debug(
|
||||
f"{self.name}: C2 Config Resolved Config from Keep Alive:"
|
||||
f"Masquerade Port: {self.c2_config.masquerade_port}"
|
||||
f"Masquerade Protocol: {self.c2_config.masquerade_protocol}"
|
||||
f"Keep Alive Frequency: {self.c2_config.keep_alive_frequency}"
|
||||
)
|
||||
|
||||
# This statement is intended to catch on the C2 Application that is listening for connection. (C2 Beacon)
|
||||
if self.c2_remote_connection is None:
|
||||
self.sys_log.debug(f"{self.name}: Attempting to configure remote C2 connection based off received output.")
|
||||
|
||||
@@ -168,6 +168,10 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
|
||||
f"Masquerade Protocol: {masquerade_protocol}"
|
||||
f"Masquerade Port: {masquerade_port}"
|
||||
)
|
||||
# Send a keep alive to the C2 Server if we already have a keep alive.
|
||||
if self.c2_connection_active is True:
|
||||
self.sys_log.info(f"{self.name}: Updating C2 Server with updated C2 configuration.")
|
||||
self._send_keep_alive(self.c2_session.uuid if not None else None)
|
||||
return True
|
||||
|
||||
def establish(self) -> bool:
|
||||
|
||||
@@ -317,10 +317,12 @@ class C2Server(AbstractC2, identifier="C2Server"):
|
||||
:rtype bool:
|
||||
"""
|
||||
if self.keep_alive_inactivity > self.c2_config.keep_alive_frequency:
|
||||
self.sys_log.debug(
|
||||
f"{self.name}: Failed to receive expected keep alive from {self.c2_remote_connection} at {timestep}."
|
||||
)
|
||||
self.sys_log.info(f"{self.name}: C2 Beacon connection considered dead due to inactivity.")
|
||||
self.sys_log.debug(
|
||||
f"{self.name}: Did not receive expected keep alive connection from {self.c2_remote_connection}"
|
||||
f"{self.name}: Expected at timestep: {timestep} due to frequency: {self.c2_config.keep_alive_frequency}"
|
||||
f"{self.name}: Last Keep Alive received at {(timestep - self.keep_alive_inactivity)}"
|
||||
)
|
||||
self._reset_c2_connection()
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -10,7 +10,7 @@ from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHe
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
|
||||
from primaite.simulator.network.hardware.nodes.network.router import AccessControlList, ACLAction, Router
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
@@ -209,6 +209,8 @@ def test_c2_suite_acl_block(basic_network):
|
||||
|
||||
network: Network = basic_network
|
||||
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
|
||||
computer_b.software_manager.install(software_class=RansomwareScript)
|
||||
ransomware_config = {"server_ip_address": "192.168.0.2"}
|
||||
|
||||
router: Router = network.get_node_by_hostname("router")
|
||||
|
||||
@@ -275,3 +277,189 @@ def test_c2_suite_terminal_command_file_creation(basic_network):
|
||||
|
||||
assert computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
|
||||
assert c2_beacon.remote_terminal_session is not None
|
||||
|
||||
|
||||
def test_c2_suite_acl_bypass(basic_network):
|
||||
"""Tests that C2 Beacon can be reconfigured to connect C2 Server to bypass blocking ACL rules.
|
||||
|
||||
1. This Test first configures a router to block HTTP traffic and asserts the following:
|
||||
1. C2 Beacon and C2 Server are unable to maintain connection
|
||||
2. Traffic is confirmed to be blocked by the ACL rule.
|
||||
|
||||
2. Next the C2 Beacon is re-configured to use FTP which is permitted by the ACL and asserts the following;
|
||||
1. The C2 Beacon and C2 Server re-establish connection
|
||||
2. The ACL rule has not prevent any further traffic.
|
||||
3. A test file create command is sent & it's output confirmed
|
||||
|
||||
3. The ACL is then re-configured to block FTP traffic and asserts the following:
|
||||
1. C2 Beacon and C2 Server are unable to maintain connection
|
||||
2. Traffic is confirmed to be blocked by the ACL rule.
|
||||
|
||||
4. Next the C2 Beacon is re-configured to use HTTP which is permitted by the ACL and asserts the following;
|
||||
1. The C2 Beacon and C2 Server re-establish connection
|
||||
2. The ACL rule has not prevent any further traffic.
|
||||
3. A test file create command is sent & it's output confirmed
|
||||
"""
|
||||
|
||||
network: Network = basic_network
|
||||
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
|
||||
router: Router = network.get_node_by_hostname("router")
|
||||
|
||||
################ Confirm Default Setup #########################
|
||||
|
||||
# Permitting all HTTP & FTP traffic
|
||||
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.HTTP, dst_port=Port.HTTP, position=0)
|
||||
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.FTP, dst_port=Port.FTP, position=1)
|
||||
|
||||
c2_beacon.apply_timestep(0)
|
||||
assert c2_beacon.keep_alive_inactivity == 1
|
||||
|
||||
# Keep Alive successfully sent and received upon the 2nd timestep.
|
||||
c2_beacon.apply_timestep(1)
|
||||
|
||||
assert c2_beacon.keep_alive_inactivity == 0
|
||||
assert c2_beacon.c2_connection_active == True
|
||||
|
||||
################ Denying HTTP Traffic #########################
|
||||
|
||||
# Now we add a HTTP blocking acl (Thus preventing a keep alive)
|
||||
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=0)
|
||||
blocking_acl: AccessControlList = router.acl.acl[0]
|
||||
|
||||
# Asserts to show the C2 Suite is unable to maintain connection:
|
||||
|
||||
network.apply_timestep(2)
|
||||
network.apply_timestep(3)
|
||||
|
||||
c2_packets_blocked = blocking_acl.match_count
|
||||
assert c2_packets_blocked != 0
|
||||
assert c2_beacon.c2_connection_active is False
|
||||
|
||||
# Stepping one more time to confirm that the C2 server drops its connection
|
||||
network.apply_timestep(4)
|
||||
assert c2_server.c2_connection_active is False
|
||||
|
||||
################ Configuring C2 to use FTP #####################
|
||||
|
||||
# Reconfiguring the c2 beacon to now use FTP
|
||||
c2_beacon.configure(
|
||||
c2_server_ip_address="192.168.0.2",
|
||||
keep_alive_frequency=2,
|
||||
masquerade_port=Port.FTP,
|
||||
masquerade_protocol=IPProtocol.TCP,
|
||||
)
|
||||
|
||||
c2_beacon.establish()
|
||||
|
||||
################ Confirming connection via FTP #####################
|
||||
|
||||
# Confirming we've re-established connection
|
||||
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Confirming that we can send commands:
|
||||
|
||||
ftp_file_create_command = {
|
||||
"commands": [
|
||||
["file_system", "create", "folder", "test_folder"],
|
||||
["file_system", "create", "file", "test_folder", "ftp_test_file", "True"],
|
||||
],
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ip_address": None,
|
||||
}
|
||||
c2_server._send_command(C2Command.TERMINAL, command_options=ftp_file_create_command)
|
||||
assert (
|
||||
computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="ftp_test_file")
|
||||
== True
|
||||
)
|
||||
|
||||
# Confirming we can maintain connection
|
||||
|
||||
# Stepping twenty timesteps in the network
|
||||
i = 4 # We're already at the 4th timestep (starting at timestep 4)
|
||||
|
||||
for i in range(20):
|
||||
network.apply_timestep(i)
|
||||
|
||||
# Confirming HTTP ACL ineffectiveness (C2 Bypass)
|
||||
|
||||
# Asserting that the ACL hasn't caught more traffic and the c2 connection is still active
|
||||
assert c2_packets_blocked == blocking_acl.match_count
|
||||
assert c2_server.c2_connection_active is True
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
|
||||
################ Denying FTP Traffic & Enable HTTP #########################
|
||||
|
||||
# Blocking FTP and re-permitting HTTP:
|
||||
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.HTTP, dst_port=Port.HTTP, position=0)
|
||||
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.FTP, dst_port=Port.FTP, position=1)
|
||||
blocking_acl: AccessControlList = router.acl.acl[1]
|
||||
|
||||
# Asserts to show the C2 Suite is unable to maintain connection:
|
||||
|
||||
network.apply_timestep(25)
|
||||
network.apply_timestep(26)
|
||||
|
||||
c2_packets_blocked = blocking_acl.match_count
|
||||
assert c2_packets_blocked != 0
|
||||
assert c2_beacon.c2_connection_active is False
|
||||
|
||||
# Stepping one more time to confirm that the C2 server drops its connection
|
||||
network.apply_timestep(27)
|
||||
assert c2_server.c2_connection_active is False
|
||||
|
||||
################ Configuring C2 to use HTTP #####################
|
||||
|
||||
# Reconfiguring the c2 beacon to now use HTTP Again
|
||||
c2_beacon.configure(
|
||||
c2_server_ip_address="192.168.0.2",
|
||||
keep_alive_frequency=2,
|
||||
masquerade_port=Port.HTTP,
|
||||
masquerade_protocol=IPProtocol.TCP,
|
||||
)
|
||||
|
||||
c2_beacon.establish()
|
||||
|
||||
################ Confirming connection via HTTP #####################
|
||||
|
||||
# Confirming we've re-established connection
|
||||
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Confirming that we can send commands
|
||||
|
||||
http_file_create_command = {
|
||||
"commands": [
|
||||
["file_system", "create", "folder", "test_folder"],
|
||||
["file_system", "create", "file", "test_folder", "http_test_file", "True"],
|
||||
],
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ip_address": None,
|
||||
}
|
||||
c2_server._send_command(C2Command.TERMINAL, command_options=http_file_create_command)
|
||||
assert (
|
||||
computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="http_test_file")
|
||||
== True
|
||||
)
|
||||
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Confirming we can maintain connection
|
||||
|
||||
# Stepping twenty timesteps in the network
|
||||
i = 28 # We're already at the 28th timestep
|
||||
|
||||
for i in range(20):
|
||||
network.apply_timestep(i)
|
||||
|
||||
# Confirming FTP ACL ineffectiveness (C2 Bypass)
|
||||
|
||||
# Asserting that the ACL hasn't caught more traffic and the c2 connection is still active
|
||||
assert c2_packets_blocked == blocking_acl.match_count
|
||||
assert c2_server.c2_connection_active is True
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
|
||||
@@ -4,6 +4,8 @@ import pytest
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
|
||||
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server
|
||||
@@ -124,8 +126,38 @@ def test_c2_handle_switching_port(basic_c2_network):
|
||||
|
||||
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
|
||||
|
||||
# Asserting that the c2 beacon has established a c2 connection
|
||||
# Asserting that the c2 applications have established a c2 connection
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Assert to confirm that both the C2 server and the C2 beacon are configured correctly.
|
||||
assert c2_beacon.c2_config.keep_alive_frequency is 2
|
||||
assert c2_beacon.c2_config.masquerade_port is Port.HTTP
|
||||
assert c2_beacon.c2_config.masquerade_protocol is IPProtocol.TCP
|
||||
|
||||
assert c2_server.c2_config.keep_alive_frequency is 2
|
||||
assert c2_server.c2_config.masquerade_port is Port.HTTP
|
||||
assert c2_server.c2_config.masquerade_protocol is IPProtocol.TCP
|
||||
|
||||
# Configuring the C2 Beacon.
|
||||
c2_beacon.configure(
|
||||
c2_server_ip_address="192.168.0.1",
|
||||
keep_alive_frequency=2,
|
||||
masquerade_port=Port.FTP,
|
||||
masquerade_protocol=IPProtocol.TCP,
|
||||
)
|
||||
|
||||
# Asserting that the c2 applications have established a c2 connection
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Assert to confirm that both the C2 server and the C2 beacon
|
||||
# Have reconfigured their C2 settings.
|
||||
assert c2_beacon.c2_config.masquerade_port is Port.FTP
|
||||
assert c2_beacon.c2_config.masquerade_protocol is IPProtocol.TCP
|
||||
|
||||
assert c2_server.c2_config.masquerade_port is Port.FTP
|
||||
assert c2_server.c2_config.masquerade_protocol is IPProtocol.TCP
|
||||
|
||||
|
||||
def test_c2_handle_switching_frequency(basic_c2_network):
|
||||
@@ -136,3 +168,40 @@ def test_c2_handle_switching_frequency(basic_c2_network):
|
||||
|
||||
# Asserting that the c2 beacon has established a c2 connection
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
network: Network = basic_c2_network
|
||||
|
||||
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
|
||||
|
||||
# Asserting that the c2 applications have established a c2 connection
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Assert to confirm that both the C2 server and the C2 beacon are configured correctly.
|
||||
assert c2_beacon.c2_config.keep_alive_frequency is 2
|
||||
assert c2_server.c2_config.keep_alive_frequency is 2
|
||||
|
||||
# Configuring the C2 Beacon.
|
||||
c2_beacon.configure(c2_server_ip_address="192.168.0.1", keep_alive_frequency=10)
|
||||
|
||||
# Asserting that the c2 applications have established a c2 connection
|
||||
assert c2_beacon.c2_connection_active is True
|
||||
assert c2_server.c2_connection_active is True
|
||||
|
||||
# Assert to confirm that both the C2 server and the C2 beacon
|
||||
# Have reconfigured their C2 settings.
|
||||
assert c2_beacon.c2_config.keep_alive_frequency is 10
|
||||
assert c2_server.c2_config.keep_alive_frequency is 10
|
||||
|
||||
# Now skipping 9 time steps to confirm keep alive inactivity
|
||||
for i in range(9):
|
||||
network.apply_timestep(i)
|
||||
|
||||
# If the keep alive reconfiguration failed then the keep alive inactivity could never reach 9
|
||||
# As another keep alive would have already been sent.
|
||||
assert c2_beacon.keep_alive_inactivity is 9
|
||||
assert c2_server.keep_alive_inactivity is 9
|
||||
|
||||
network.apply_timestep(10)
|
||||
|
||||
assert c2_beacon.keep_alive_inactivity is 0
|
||||
assert c2_server.keep_alive_inactivity is 0
|
||||
|
||||
Reference in New Issue
Block a user