#2681 Updated to include yaml file tests + include listening on multiports.

This commit is contained in:
Archer Bowen
2024-08-13 09:37:11 +01:00
parent 27ec06658f
commit 6c7376ab4b
6 changed files with 143 additions and 5 deletions

View File

@@ -197,7 +197,7 @@ Via Configuration
...
# A C2 Beacon will not automatically connection to a C2 Server.
# Either an agent must use application_execute.
# Or a user must use .establish().
# Or a if using the simulation layer - .establish().
applications:
type: C2Beacon
options:
@@ -205,6 +205,10 @@ Via Configuration
keep_alive_frequency: 5
masquerade_protocol: tcp
masquerade_port: http
listen_on_ports:
- 80
- 53
- 21
@@ -264,6 +268,13 @@ This must be a string i.e ``DNS``. Defaults to ``HTTP``.
_Please refer to the ``IPProtocol`` class for further reference._
C2 Server Configuration
=======================
_The C2 Server does not currently offer any unique configuration options and will configure itself to match the C2 beacon's network behaviour._
.. include:: ../common/common_configuration.rst
.. |SOFTWARE_NAME| replace:: C2Server
.. |SOFTWARE_NAME_BACKTICK| replace:: ``C2Server``

View File

@@ -27,6 +27,7 @@ from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.nmne import NMNEConfig
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.sim_container import Simulation
from primaite.simulator.system.applications.application import Application
@@ -455,6 +456,21 @@ class PrimaiteGame:
dos_intensity=float(opt.get("dos_intensity", "1.0")),
max_sessions=int(opt.get("max_sessions", "1000")),
)
elif application_type == "C2Beacon":
if "options" in application_cfg:
opt = application_cfg["options"]
new_application.configure(
c2_server_ip_address=IPv4Address(opt.get("c2_server_ip_address")),
keep_alive_frequency=(opt.get("keep_alive_frequency"))
if opt.get("keep_alive_frequency")
else 5,
masquerade_protocol=IPProtocol[(opt.get("masquerade_protocol"))]
if opt.get("masquerade_protocol")
else IPProtocol.TCP,
masquerade_port=Port[(opt.get("masquerade_port"))]
if opt.get("masquerade_port")
else Port.HTTP,
)
if "network_interfaces" in node_cfg:
for nic_num, nic_cfg in node_cfg["network_interfaces"].items():
new_node.connect_nic(NIC(ip_address=nic_cfg["ip_address"], subnet_mask=nic_cfg["subnet_mask"]))

View File

@@ -247,9 +247,9 @@ class AbstractC2(Application, identifier="AbstractC2"):
self.keep_alive_sent = True
self.sys_log.info(f"{self.name}: Keep Alive sent to {self.c2_remote_connection}")
self.sys_log.debug(
f"{self.name}: Keep Alive sent to {self.c2_remote_connection}"
f"Using Masquerade Port: {self.c2_config.masquerade_port}"
f"Using Masquerade Protocol: {self.c2_config.masquerade_protocol}"
f"{self.name}: Keep Alive sent to {self.c2_remote_connection} "
f"Masquerade Port: {self.c2_config.masquerade_port} "
f"Masquerade Protocol: {self.c2_config.masquerade_protocol} "
)
return True
else:

View File

@@ -97,6 +97,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
def __init__(self, **kwargs):
kwargs["name"] = "C2Server"
super().__init__(**kwargs)
self.run()
def _handle_command_output(self, payload: C2Packet) -> bool:
"""

View File

@@ -0,0 +1,76 @@
# Basic Switched network
#
# -------------- -------------- --------------
# | node_a |------| switch_1 |------| node_b |
# -------------- -------------- --------------
#
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
agent_log_level: INFO
save_agent_logs: true
write_agent_log_to_terminal: True
game:
max_episode_length: 256
ports:
- ARP
- DNS
- HTTP
- POSTGRES_SERVER
protocols:
- ICMP
- TCP
- UDP
simulation:
network:
nodes:
- type: switch
hostname: switch_1
num_ports: 8
- hostname: node_a
type: computer
ip_address: 192.168.10.21
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
applications:
- type: C2Server
options:
listen_on_ports:
- 80
- 53
- 21
- hostname: node_b
type: computer
ip_address: 192.168.10.22
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
applications:
- type: C2Beacon
options:
c2_server_ip_address: 192.168.10.21
keep_alive_frequency: 5
masquerade_protocol: TCP
masquerade_port: HTTP
listen_on_ports:
- 80
- 53
- 21
links:
- endpoint_a_hostname: switch_1
endpoint_a_port: 1
endpoint_b_hostname: node_a
endpoint_b_port: 1
bandwidth: 200
- endpoint_a_hostname: switch_1
endpoint_a_port: 2
endpoint_b_hostname: node_b
endpoint_b_port: 1
bandwidth: 200

View File

@@ -3,6 +3,7 @@ from ipaddress import IPv4Address
from typing import Tuple
import pytest
import yaml
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
@@ -22,6 +23,7 @@ from primaite.simulator.system.applications.red_applications.ransomware_script i
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.web_server.web_server import WebServer
from tests import TEST_ASSETS_ROOT
@pytest.fixture(scope="function")
@@ -463,3 +465,35 @@ def test_c2_suite_acl_bypass(basic_network):
assert c2_packets_blocked == blocking_acl.match_count
assert c2_server.c2_connection_active is True
assert c2_beacon.c2_connection_active is True
def test_c2_suite_yaml():
"""Tests that the C2 Suite is can be configured correctly via the Yaml."""
with open(TEST_ASSETS_ROOT / "configs" / "basic_c2_setup.yaml") as f:
cfg = yaml.safe_load(f)
game = PrimaiteGame.from_config(cfg)
yaml_network = game.simulation.network
computer_a: Computer = yaml_network.get_node_by_hostname("node_a")
c2_server: C2Server = computer_a.software_manager.software.get("C2Server")
computer_b: Computer = yaml_network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
assert c2_server.operating_state == ApplicationOperatingState.RUNNING
assert c2_beacon.c2_remote_connection == IPv4Address("192.168.10.21")
c2_beacon.establish()
# Asserting that the c2 beacon has established a c2 connection
assert c2_beacon.c2_connection_active is True
# Asserting that the c2 server has established a c2 connection.
assert c2_server.c2_connection_active is True
assert c2_server.c2_remote_connection == IPv4Address("192.168.10.22")
for i in range(50):
yaml_network.apply_timestep(i)
assert c2_beacon.c2_connection_active is True
assert c2_server.c2_connection_active is True