#2689 Address a couple of TODOs and other misc changes.

This commit is contained in:
Archer.Bowen
2024-08-07 10:34:30 +01:00
parent 9c68cd4bd0
commit afa4d2b946
4 changed files with 84 additions and 114 deletions

View File

@@ -180,7 +180,7 @@
"source": [
"## **Command and Control** | C2 Beacon Actions\n",
"\n",
"Before the Red Agent is able to perform any C2 Server commands, it must first establish connection with a C2 beacon.\n",
"Before any C2 Server commands is able to accept any commands, it must first establish connection with a C2 beacon.\n",
"\n",
"This can be done by installing, configuring and then executing a C2 Beacon. "
]
@@ -341,6 +341,37 @@
"# TODO: Post Terminal.\n",
"#env.step(7)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## **Command and Control** | Blue Agent Relevance"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Command and Control** | Blue Agent Relevance | Observation Space"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Command and Control** | Blue Agent Relevance | Action Space"
]
}
],
"metadata": {

View File

@@ -184,8 +184,6 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
:return: The Request Response provided by the terminal execute method.
:rtype Request Response:
"""
# TODO: Probably could refactor this to be a more clean.
# The elif's are a bit ugly when they are all calling the same method.
command = payload.command
if not isinstance(command, C2Command):
self.sys_log.warning(f"{self.name}: Received unexpected C2 command. Unable to resolve command")
@@ -253,19 +251,26 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
"""
C2 Command: Ransomware Configuration.
Creates a request that configures the ransomware based off the configuration options given.
This request is then sent to the terminal service in order to be executed.
Calls the locally installed RansomwareScript application's configure method
and passes the given parameters.
The class attribute self._host_ransomware_script will return None if the host
does not have an instance of the RansomwareScript.
:payload MasqueradePacket: The incoming INPUT command.
:type Masquerade Packet: MasqueradePacket.
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
"""
# TODO: replace and use terminal
# return RequestResponse(status="success", data={"Reason": "Placeholder."})
given_config = payload.payload
host_ransomware = self._host_ransomware_script
return RequestResponse.from_bool(host_ransomware.configure(server_ip_address=given_config["server_ip_address"]))
if self._host_ransomware_script is None:
return RequestResponse(
status="failure",
data={"Reason": "Cannot find any instances of a RansomwareScript. Have you installed one?"},
)
return RequestResponse.from_bool(
self._host_ransomware_script.configure(server_ip_address=given_config["server_ip_address"])
)
def _command_ransomware_launch(self, payload: MasqueradePacket) -> RequestResponse:
"""
@@ -280,6 +285,11 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
:return: Returns the Request Response returned by the Terminal execute method.
:rtype: Request Response
"""
if self._host_ransomware_script is None:
return RequestResponse(
status="failure",
data={"Reason": "Cannot find any instances of a RansomwareScript. Have you installed one?"},
)
return RequestResponse.from_bool(self._host_ransomware_script.attack())
def _command_terminal(self, payload: MasqueradePacket) -> RequestResponse:

View File

@@ -219,7 +219,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
"""
Creates and returns a Masquerade Packet using the arguments given.
Creates Masquerade Packet with a payload_type INPUT C2Payload
Creates Masquerade Packet with a payload_type INPUT C2Payload.
:param given_command: The C2 command to be sent to the C2 Beacon.
:type given_command: C2Command.
@@ -228,7 +228,6 @@ class C2Server(AbstractC2, identifier="C2Server"):
:return: Returns the construct MasqueradePacket
:rtype: MasqueradePacket
"""
# TODO: Validation on command_options.
constructed_packet = MasqueradePacket(
masquerade_protocol=self.current_masquerade_protocol,
masquerade_port=self.current_masquerade_port,

View File

@@ -21,40 +21,44 @@ from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.web_server.web_server import WebServer
@pytest.fixture(scope="function")
def c2_server_on_computer() -> Tuple[C2Beacon, Computer]:
computer: Computer = Computer(
hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0
)
computer.power_on()
c2_beacon = computer.software_manager.software.get("C2Beacon")
return [c2_beacon, computer]
@pytest.fixture(scope="function")
def c2_server_on_computer() -> Tuple[C2Server, Computer]:
computer: Computer = Computer(
hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0
)
computer.power_on()
c2_server = computer.software_manager.software.get("C2Server")
return [c2_server, computer]
@pytest.fixture(scope="function")
def basic_network() -> Network:
network = Network()
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
# Creating two generic nodes for the C2 Server and the C2 Beacon.
node_a = Computer(hostname="node_a", ip_address="192.168.0.2", subnet_mask="255.255.255.252", start_up_duration=0)
node_a.power_on()
node_a.software_manager.get_open_ports()
node_a.software_manager.install(software_class=C2Server)
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
node_b = Computer(hostname="node_b", ip_address="192.168.255.2", subnet_mask="255.255.255.252", start_up_duration=0)
node_b.software_manager.install(software_class=C2Beacon)
node_b.power_on()
network.connect(node_a.network_interface[1], node_b.network_interface[1])
# Creating a router to sit between node 1 and node 2.
router = Router(hostname="router", num_ports=3, start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="192.168.0.1", subnet_mask="255.255.255.252")
router.configure_port(port=2, ip_address="192.168.255.1", subnet_mask="255.255.255.252")
# Creating switches for each client.
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
switch_1.power_on()
# Connecting the switches to the router.
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
router.enable_port(1)
switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.enable_port(2)
# Connecting the node to each switch
network.connect(node_a.network_interface[1], switch_1.network_interface[1])
network.connect(node_b.network_interface[1], switch_2.network_interface[1])
return network
@@ -68,9 +72,10 @@ def test_c2_suite_setup_receive(basic_network):
computer_b: Computer = network.get_node_by_hostname("node_b")
c2_beacon: C2Beacon = computer_b.software_manager.software.get("C2Beacon")
computer_a.ping("192.168.255.1")
# Assert that the c2 beacon configure correctly.
c2_beacon.configure(c2_server_ip_address="192.168.0.10")
assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.10")
c2_beacon.configure(c2_server_ip_address="192.168.0.2")
assert c2_beacon.c2_remote_connection == IPv4Address("192.168.0.2")
c2_server.run()
c2_beacon.establish()
@@ -80,7 +85,7 @@ def test_c2_suite_setup_receive(basic_network):
# Asserting that the c2 server has established a c2 connection.
assert c2_server.c2_connection_active is True
assert c2_server.c2_remote_connection == IPv4Address("192.168.0.11")
assert c2_server.c2_remote_connection == IPv4Address("192.168.255.2")
def test_c2_suite_keep_alive_inactivity(basic_network):
@@ -177,81 +182,6 @@ def test_c2_suite_terminal(basic_network):
"""Tests that a red agent is able to execute terminal commands via C2 Server Actions."""
@pytest.fixture(scope="function")
def acl_network() -> Network:
# 0: Pull out the network
network = Network()
# 1: Set up network hardware
# 1.1: Configure the router
router = Router(hostname="router", num_ports=3, start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
# 1.2: Create and connect switches
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
switch_1.power_on()
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
router.enable_port(1)
switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.enable_port(2)
# 1.3: Create and connect computer
client_1 = Computer(
hostname="client_1",
ip_address="10.0.1.2",
subnet_mask="255.255.255.0",
default_gateway="10.0.1.1",
start_up_duration=0,
)
client_1.power_on()
client_1.software_manager.install(software_class=C2Server)
network.connect(
endpoint_a=client_1.network_interface[1],
endpoint_b=switch_1.network_interface[1],
)
client_2 = Computer(
hostname="client_2",
ip_address="10.0.1.3",
subnet_mask="255.255.255.0",
default_gateway="10.0.1.1",
start_up_duration=0,
)
client_2.power_on()
client_2.software_manager.install(software_class=C2Beacon)
network.connect(endpoint_a=client_2.network_interface[1], endpoint_b=switch_2.network_interface[1])
# 1.4: Create and connect servers
server_1 = Server(
hostname="server_1",
ip_address="10.0.2.2",
subnet_mask="255.255.255.0",
default_gateway="10.0.2.1",
start_up_duration=0,
)
server_1.power_on()
network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1])
server_2 = Server(
hostname="server_2",
ip_address="10.0.2.3",
subnet_mask="255.255.255.0",
default_gateway="10.0.2.1",
start_up_duration=0,
)
server_2.power_on()
network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2])
return network
# TODO: Fix this test: Not sure why this isn't working
def test_c2_suite_acl_block(acl_network):
"""Tests that C2 Beacon disconnects from the C2 Server after blocking ACL rules."""
network: Network = acl_network