#2689 Fixed up Pytests and confirmed functionality before merging from dev.

This commit is contained in:
Archer Bowen
2024-08-07 14:16:50 +01:00
parent afa4d2b946
commit b1baf023d6
4 changed files with 77 additions and 58 deletions

View File

@@ -390,7 +390,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.10.12"
}
},
"nbformat": 4,

View File

@@ -380,6 +380,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
self.keep_alive_inactivity += 1
if not self._check_c2_connection(timestep):
self.sys_log.error(f"{self.name}: Connection Severed - Application Closing.")
self.c2_connection_active = False
self.clear_connections()
# TODO: Shouldn't this close() method also set the health state to 'UNUSED'?
self.close()

View File

@@ -33,18 +33,18 @@ def pc_a_pc_b_router_1() -> Tuple[Computer, Computer, Router]:
)
pc_b.power_on()
router_1 = Router(hostname="router_1", start_up_duration=0)
router_1.power_on()
router = Router(hostname="router", start_up_duration=0)
router.power_on()
router_1.configure_port(1, "192.168.0.1", "255.255.255.0")
router_1.configure_port(2, "192.168.1.1", "255.255.255.0")
router.configure_port(1, "192.168.0.1", "255.255.255.0")
router.configure_port(2, "192.168.1.1", "255.255.255.0")
network.connect(endpoint_a=pc_a.network_interface[1], endpoint_b=router_1.network_interface[1])
network.connect(endpoint_a=pc_b.network_interface[1], endpoint_b=router_1.network_interface[2])
router_1.enable_port(1)
router_1.enable_port(2)
network.connect(endpoint_a=pc_a.network_interface[1], endpoint_b=router.network_interface[1])
network.connect(endpoint_a=pc_b.network_interface[1], endpoint_b=router.network_interface[2])
router.enable_port(1)
router.enable_port(2)
return pc_a, pc_b, router_1
return pc_a, pc_b, router
@pytest.fixture(scope="function")

View File

@@ -6,6 +6,7 @@ import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
@@ -14,9 +15,11 @@ from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Server
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.web_server.web_server import WebServer
@@ -26,33 +29,46 @@ def basic_network() -> Network:
network = Network()
# Creating two generic nodes for the C2 Server and the C2 Beacon.
node_a = Computer(hostname="node_a", ip_address="192.168.0.2", subnet_mask="255.255.255.252", start_up_duration=0)
node_a = Computer(
hostname="node_a",
ip_address="192.168.0.2",
subnet_mask="255.255.255.252",
default_gateway="192.168.0.1",
start_up_duration=0,
)
node_a.power_on()
node_a.software_manager.get_open_ports()
node_a.software_manager.install(software_class=C2Server)
node_b = Computer(hostname="node_b", ip_address="192.168.255.2", subnet_mask="255.255.255.252", start_up_duration=0)
node_b.software_manager.install(software_class=C2Beacon)
node_b = Computer(
hostname="node_b",
ip_address="192.168.255.2",
subnet_mask="255.255.255.252",
default_gateway="192.168.255.1",
start_up_duration=0,
)
node_b.power_on()
node_b.software_manager.install(software_class=C2Beacon)
# Creating a router to sit between node 1 and node 2.
router = Router(hostname="router", num_ports=3, start_up_duration=0)
# Default allow all.
router.acl.add_rule(action=ACLAction.PERMIT)
router.power_on()
router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252")
router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252")
# Creating switches for each client.
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
switch_1.power_on()
# Connecting the switches to the router.
router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252")
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
router.enable_port(1)
switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252")
router.enable_port(1)
router.enable_port(2)
# Connecting the node to each switch
@@ -72,7 +88,6 @@ def test_c2_suite_setup_receive(basic_network):
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
computer_a.ping("192.168.255.1")
# Assert that the c2 beacon configure correctly.
c2_beacon.configure(c2_server_ip_address="192.168.0.2")
assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.2")
@@ -97,8 +112,7 @@ def test_c2_suite_keep_alive_inactivity(basic_network):
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
# Initial config (#TODO: Make this a function)
c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2)
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
@@ -116,12 +130,11 @@ def test_c2_suite_keep_alive_inactivity(basic_network):
c2_beacon.apply_timestep(3)
assert c2_beacon.keep_alive_inactivity == 2
assert c2_beacon.c2_connection_active == False
assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED
assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED
# TODO: Flesh out these tests.
def test_c2_suite_configure_via_actions(basic_network):
"""Tests that a red agent is able to configure the c2 beacon and c2 server via Actions."""
def test_c2_suite_configure_request(basic_network):
"""Tests that the request system can be used to successfully setup a c2 suite."""
# Setting up the network:
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
@@ -131,88 +144,93 @@ def test_c2_suite_configure_via_actions(basic_network):
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
# Testing Via Requests:
network.apply_request(["node", "node_a", "application", "C2Server", "run"])
c2_server.run()
network.apply_timestep(0)
c2_beacon_config = {
"c2_server_ip_address": "192.168.0.10",
"c2_server_ip_address": "192.168.0.2",
"keep_alive_frequency": 5,
"masquerade_protocol": IPProtocol.TCP,
"masquerade_port": Port.HTTP,
"masquerade_protocol": "TCP",
"masquerade_port": "HTTP",
}
network.apply_request(["node", "node_b", "application", "C2Beacon", "configure", c2_beacon_config])
network.apply_timestep(0)
network.apply_request(["node", "node_b", "application", "C2Beacon", "execute"])
assert c2_beacon.c2_connection_active is True
assert c2_server.c2_connection_active is True
assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11")
# Testing Via Agents:
# TODO:
assert c2_server.c2_remote_connection == IPv4Address("192.168.255.2")
def test_c2_suite_configure_ransomware(basic_network):
"""Tests that a red agent is able to configure ransomware via C2 Server Actions."""
def test_c2_suite_ransomware_commands(basic_network):
"""Tests the Ransomware commands can be used to configure & launch ransomware via Requests."""
# Setting up the network:
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_a.software_manager.install(DatabaseService)
computer_a.software_manager.software["DatabaseService"].start()
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
computer_b.software_manager.install(DatabaseClient)
computer_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.2"))
computer_b.software_manager.software["DatabaseClient"].run()
c2_beacon.configure(c2_server_ip_address="192.168.0.10", keep_alive_frequency=2)
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
# Testing Via Requests:
computer_b.software_manager.install(software_class=RansomwareScript)
ransomware_config = {"server_ip_address": "1.1.1.1"}
ransomware_config = {"server_ip_address": "192.168.0.2"}
network.apply_request(["node", "node_a", "application", "C2Server", "ransomware_configure", ransomware_config])
ransomware_script: RansomwareScript = computer_b.software_manager.software["RansomwareScript"]
assert ransomware_script.server_ip_address == "1.1.1.1"
assert ransomware_script.server_ip_address == "192.168.0.2"
# Testing Via Agents:
# TODO:
network.apply_request(["node", "node_a", "application", "C2Server", "ransomware_launch"])
database_file = computer_a.software_manager.file_system.get_file("database", "database.db")
assert database_file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_c2_suite_terminal(basic_network):
"""Tests that a red agent is able to execute terminal commands via C2 Server Actions."""
def test_c2_suite_acl_block(acl_network):
def test_c2_suite_acl_block(basic_network):
"""Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules."""
network: Network = acl_network
computer_a: Computer = network.get_node_by_hostname("client_1")
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_b: Computer = network.get_node_by_hostname("client_2")
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
router: Router = network.get_node_by_hostname("router")
network.apply_timestep(0)
# Initial config (#TODO: Make this a function)
c2_beacon.configure(c2_server_ip_address="10.0.1.2", keep_alive_frequency=2)
c2_beacon.configure(c2_server_ip_address="192.168.0.2", keep_alive_frequency=2)
c2_server.run()
c2_beacon.establish()
c2_beacon.apply_timestep(0)
assert c2_beacon.keep_alive_inactivity == 1
# Keep Alive successfully sent and received upon the 2nd timestep.
c2_beacon.apply_timestep(1)
assert c2_beacon.keep_alive_inactivity == 0
assert c2_beacon.c2_connection_active == True
assert c2_server.c2_connection_active == True
# Now we add a HTTP blocking acl (Thus preventing a keep alive)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=1)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=0)
c2_beacon.apply_timestep(1)
c2_beacon.apply_timestep(2)
c2_beacon.apply_timestep(3)
assert c2_beacon.keep_alive_inactivity == 2
assert c2_beacon.c2_connection_active == False
assert c2_beacon.health_state_actual == ApplicationOperatingState.CLOSED
assert c2_beacon.operating_state == ApplicationOperatingState.CLOSED
def test_c2_suite_launch_ransomware(basic_network):
"""Tests that a red agent is able to launch ransomware via C2 Server Actions."""
def test_c2_suite_terminal(basic_network):
"""Tests the Ransomware commands can be used to configure & launch ransomware via Requests."""