#2689 General improvements.

1. Abstract TAP now handles .apply_timestep
2. Expanded tests
3. Added pydantic model for c2 configuration.
This commit is contained in:
Archer Bowen
2024-08-09 17:53:47 +01:00
parent ab91f993a5
commit 53433ce7b6
8 changed files with 421 additions and 194 deletions

View File

@@ -17,7 +17,7 @@ from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_server import DNSServer
@@ -29,6 +29,7 @@ def basic_network() -> Network:
network = Network()
# Creating two generic nodes for the C2 Server and the C2 Beacon.
node_a = Computer(
hostname="node_a",
ip_address="192.168.0.2",
@@ -43,12 +44,24 @@ def basic_network() -> Network:
node_b = Computer(
hostname="node_b",
ip_address="192.168.255.2",
subnet_mask="255.255.255.252",
subnet_mask="255.255.255.248",
default_gateway="192.168.255.1",
start_up_duration=0,
)
node_b.power_on()
node_b.software_manager.install(software_class=C2Beacon)
# Creating a generic computer for testing remote terminal connections.
node_c = Computer(
hostname="node_c",
ip_address="192.168.255.3",
subnet_mask="255.255.255.248",
default_gateway="192.168.255.1",
start_up_duration=0,
)
node_c.power_on()
# Creating a router to sit between node 1 and node 2.
router = Router(hostname="router", num_ports=3, start_up_duration=0)
# Default allow all.
@@ -66,35 +79,43 @@ def basic_network() -> Network:
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252")
router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.248")
router.enable_port(1)
router.enable_port(2)
# Connecting the node to each switch
network.connect(node_a.network_interface[1], switch_1.network_interface[1])
network.connect(node_b.network_interface[1], switch_2.network_interface[1])
network.connect(node_c.network_interface[1], switch_2.network_interface[2])
return network
def setup_c2(given_network: Network):
"""Installs the C2 Beacon & Server, configures and then returns."""
computer_a: Computer = given_network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_a.software_manager.install(DatabaseService)
computer_a.software_manager.software["DatabaseService"].start()
computer_b: Computer = given_network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
computer_b.software_manager.install(DatabaseClient)
computer_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.2"))
computer_b.software_manager.software["DatabaseClient"].run()
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
return given_network, computer_a, c2_server, computer_b, c2_beacon
def test_c2_suite_setup_receive(basic_network):
"""Test that C2 Beacon can successfully establish connection with the C2 Server."""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
# Assert that the c2 beacon configure correctly.
c2_beacon.configure(c2_server_ip_address="192.168.0.2")
assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.2")
c2_server.run()
c2_beacon.establish()
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
# Asserting that the c2 beacon has established a c2 connection
assert c2_beacon.c2_connection_active is True
@@ -112,15 +133,7 @@ def test_c2_suite_setup_receive(basic_network):
def test_c2_suite_keep_alive_inactivity(basic_network):
"""Tests that C2 Beacon disconnects from the C2 Server after inactivity."""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
c2_beacon.apply_timestep(0)
assert c2_beacon.keep_alive_inactivity == 1
@@ -133,21 +146,21 @@ def test_c2_suite_keep_alive_inactivity(basic_network):
# Now we turn off the c2 server (Thus preventing a keep alive)
c2_server.close()
c2_beacon.apply_timestep(2)
assert c2_beacon.keep_alive_inactivity == 1
c2_beacon.apply_timestep(3)
assert c2_beacon.keep_alive_inactivity == 2
# C2 Beacon resets it's connections back to default.
assert c2_beacon.keep_alive_inactivity == 0
assert c2_beacon.c2_connection_active == False
assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED
def test_c2_suite_configure_request(basic_network):
"""Tests that the request system can be used to successfully setup a c2 suite."""
# Setting up the network:
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
# Testing Via Requests:
c2_server.run()
@@ -173,20 +186,7 @@ def test_c2_suite_ransomware_commands(basic_network):
"""Tests the Ransomware commands can be used to configure & launch ransomware via Requests."""
# Setting up the network:
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_a.software_manager.install(DatabaseService)
computer_a.software_manager.software["DatabaseService"].start()
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
computer_b.software_manager.install(DatabaseClient)
computer_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.2"))
computer_b.software_manager.software["DatabaseClient"].run()
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
# Testing Via Requests:
computer_b.software_manager.install(software_class=RansomwareScript)
@@ -208,18 +208,10 @@ def test_c2_suite_acl_block(basic_network):
"""Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules."""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
router: Router = network.get_node_by_hostname("router")
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
c2_beacon.apply_timestep(0)
assert c2_beacon.keep_alive_inactivity == 1
@@ -233,10 +225,53 @@ def test_c2_suite_acl_block(basic_network):
c2_beacon.apply_timestep(2)
c2_beacon.apply_timestep(3)
assert c2_beacon.keep_alive_inactivity == 2
# C2 Beacon resets after unable to maintain contact.
assert c2_beacon.keep_alive_inactivity == 0
assert c2_beacon.c2_connection_active == False
assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED
def test_c2_suite_terminal(basic_network):
"""Tests the Ransomware commands can be used to configure & launch ransomware via Requests."""
def test_c2_suite_terminal_command_file_creation(basic_network):
"""Tests the C2 Terminal command can be used on local and remote."""
network: Network = basic_network
network, computer_a, c2_server, computer_b, c2_beacon = setup_c2(network)
computer_c: Computer = network.get_node_by_hostname("node_c")
# Asserting to demonstrate that the test files don't exist:
assert (
computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == False
)
assert (
computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == False
)
# Testing that we can create the test file and folders via the terminal command (Local C2 Terminal).
# Local file/folder creation commands.
file_create_command = {
"commands": [
["file_system", "create", "folder", "test_folder"],
["file_system", "create", "file", "test_folder", "test_file", "True"],
],
"username": "admin",
"password": "admin",
"ip_address": None,
}
c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command)
assert computer_b.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
assert c2_beacon.local_terminal_session is not None
# Testing that we can create the same test file/folders via on node 3 via a remote terminal.
# node_c's IP is 192.168.255.3
file_create_command.update({"ip_address": "192.168.255.3"})
c2_server._send_command(C2Command.TERMINAL, command_options=file_create_command)
assert computer_c.software_manager.file_system.access_file(folder_name="test_folder", file_name="test_file") == True
assert c2_beacon.remote_terminal_session is not None