Add port and protocol custom validators

This commit is contained in:
Marek Wolan
2024-09-20 11:21:28 +01:00
parent 08f1cf1fbd
commit 695891f55c
77 changed files with 616 additions and 613 deletions

View File

@@ -13,8 +13,7 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import AccessControlList, ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
@@ -25,6 +24,7 @@ from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from tests import TEST_ASSETS_ROOT
@@ -227,7 +227,7 @@ def test_c2_suite_acl_block(basic_network):
assert c2_beacon.c2_connection_active == True
# Now we add a HTTP blocking acl (Thus preventing a keep alive)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=0)
router.acl.add_rule(action=ACLAction.DENY, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=0)
c2_beacon.apply_timestep(2)
c2_beacon.apply_timestep(3)
@@ -322,8 +322,8 @@ def test_c2_suite_acl_bypass(basic_network):
################ Confirm Default Setup #########################
# Permitting all HTTP & FTP traffic
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=0)
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port["FTP"], dst_port=Port["FTP"], position=1)
router.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=0)
router.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["FTP"], dst_port=PORT_LOOKUP["FTP"], position=1)
c2_beacon.apply_timestep(0)
assert c2_beacon.keep_alive_inactivity == 1
@@ -337,7 +337,7 @@ def test_c2_suite_acl_bypass(basic_network):
################ Denying HTTP Traffic #########################
# Now we add a HTTP blocking acl (Thus preventing a keep alive)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=0)
router.acl.add_rule(action=ACLAction.DENY, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=0)
blocking_acl: AccessControlList = router.acl.acl[0]
# Asserts to show the C2 Suite is unable to maintain connection:
@@ -359,8 +359,8 @@ def test_c2_suite_acl_bypass(basic_network):
c2_beacon.configure(
c2_server_ip_address="192.168.0.2",
keep_alive_frequency=2,
masquerade_port=Port["FTP"],
masquerade_protocol=IPProtocol["TCP"],
masquerade_port=PORT_LOOKUP["FTP"],
masquerade_protocol=PROTOCOL_LOOKUP["TCP"],
)
c2_beacon.establish()
@@ -407,8 +407,8 @@ def test_c2_suite_acl_bypass(basic_network):
################ Denying FTP Traffic & Enable HTTP #########################
# Blocking FTP and re-permitting HTTP:
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=0)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port["FTP"], dst_port=Port["FTP"], position=1)
router.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=0)
router.acl.add_rule(action=ACLAction.DENY, src_port=PORT_LOOKUP["FTP"], dst_port=PORT_LOOKUP["FTP"], position=1)
blocking_acl: AccessControlList = router.acl.acl[1]
# Asserts to show the C2 Suite is unable to maintain connection:
@@ -430,8 +430,8 @@ def test_c2_suite_acl_bypass(basic_network):
c2_beacon.configure(
c2_server_ip_address="192.168.0.2",
keep_alive_frequency=2,
masquerade_port=Port["HTTP"],
masquerade_protocol=IPProtocol["TCP"],
masquerade_port=PORT_LOOKUP["HTTP"],
masquerade_protocol=PROTOCOL_LOOKUP["TCP"],
)
c2_beacon.establish()

View File

@@ -9,7 +9,7 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
@@ -52,7 +52,10 @@ def data_manipulation_db_server_green_client(example_network) -> Network:
router_1: Router = example_network.get_node_by_hostname("router_1")
router_1.acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["POSTGRES_SERVER"], dst_port=Port["POSTGRES_SERVER"], position=0
action=ACLAction.PERMIT,
src_port=PORT_LOOKUP["POSTGRES_SERVER"],
dst_port=PORT_LOOKUP["POSTGRES_SERVER"],
position=0,
)
client_1: Computer = network.get_node_by_hostname("client_1")

View File

@@ -8,7 +8,7 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.dos_bot import DoSAttackStage, DoSBot
@@ -26,7 +26,7 @@ def dos_bot_and_db_server(client_server) -> Tuple[DoSBot, Computer, DatabaseServ
dos_bot: DoSBot = computer.software_manager.software.get("DoSBot")
dos_bot.configure(
target_ip_address=IPv4Address(server.network_interface[1].ip_address),
target_port=Port["POSTGRES_SERVER"],
target_port=PORT_LOOKUP["POSTGRES_SERVER"],
)
# Install DB Server service on server
@@ -43,7 +43,10 @@ def dos_bot_db_server_green_client(example_network) -> Network:
router_1: Router = example_network.get_node_by_hostname("router_1")
router_1.acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["POSTGRES_SERVER"], dst_port=Port["POSTGRES_SERVER"], position=0
action=ACLAction.PERMIT,
src_port=PORT_LOOKUP["POSTGRES_SERVER"],
dst_port=PORT_LOOKUP["POSTGRES_SERVER"],
position=0,
)
client_1: Computer = network.get_node_by_hostname("client_1")
@@ -56,7 +59,7 @@ def dos_bot_db_server_green_client(example_network) -> Network:
dos_bot: DoSBot = client_1.software_manager.software.get("DoSBot")
dos_bot.configure(
target_ip_address=IPv4Address(server.network_interface[1].ip_address),
target_port=Port["POSTGRES_SERVER"],
target_port=PORT_LOOKUP["POSTGRES_SERVER"],
)
# install db server service on server

View File

@@ -9,7 +9,7 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
@@ -47,7 +47,10 @@ def ransomware_script_db_server_green_client(example_network) -> Network:
router_1: Router = example_network.get_node_by_hostname("router_1")
router_1.acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["POSTGRES_SERVER"], dst_port=Port["POSTGRES_SERVER"], position=0
action=ACLAction.PERMIT,
src_port=PORT_LOOKUP["POSTGRES_SERVER"],
dst_port=PORT_LOOKUP["POSTGRES_SERVER"],
position=0,
)
client_1: Computer = network.get_node_by_hostname("client_1")

View File

@@ -5,9 +5,9 @@ from ipaddress import IPv4Address, IPv4Network
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.nmap import NMAP
from primaite.utils.validators import PROTOCOL_LOOKUP
from tests import TEST_ASSETS_ROOT
@@ -74,11 +74,11 @@ def test_port_scan_one_node_one_port(example_network):
actual_result = client_1_nmap.port_scan(
target_ip_address=client_2.network_interface[1].ip_address,
target_port=Port["DNS"],
target_protocol=IPProtocol["TCP"],
target_port=PORT_LOOKUP["DNS"],
target_protocol=PROTOCOL_LOOKUP["TCP"],
)
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol["TCP"]: [Port["DNS"]]}}
expected_result = {IPv4Address("192.168.10.22"): {PROTOCOL_LOOKUP["TCP"]: [PORT_LOOKUP["DNS"]]}}
assert actual_result == expected_result
@@ -103,14 +103,20 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
actual_result = client_1_nmap.port_scan(
target_ip_address=IPv4Network("192.168.10.0/24"),
target_port=[Port["ARP"], Port["HTTP"], Port["FTP"], Port["DNS"], Port["NTP"]],
target_port=[
PORT_LOOKUP["ARP"],
PORT_LOOKUP["HTTP"],
PORT_LOOKUP["FTP"],
PORT_LOOKUP["DNS"],
PORT_LOOKUP["NTP"],
],
)
expected_result = {
IPv4Address("192.168.10.1"): {IPProtocol["UDP"]: [Port["ARP"]]},
IPv4Address("192.168.10.1"): {PROTOCOL_LOOKUP["UDP"]: [PORT_LOOKUP["ARP"]]},
IPv4Address("192.168.10.22"): {
IPProtocol["TCP"]: [Port["HTTP"], Port["FTP"], Port["DNS"]],
IPProtocol["UDP"]: [Port["ARP"], Port["NTP"]],
PROTOCOL_LOOKUP["TCP"]: [PORT_LOOKUP["HTTP"], PORT_LOOKUP["FTP"], PORT_LOOKUP["DNS"]],
PROTOCOL_LOOKUP["UDP"]: [PORT_LOOKUP["ARP"], PORT_LOOKUP["NTP"]],
},
}
@@ -124,10 +130,12 @@ def test_network_service_recon_all_ports_and_protocols(example_network):
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
actual_result = client_1_nmap.network_service_recon(
target_ip_address=IPv4Network("192.168.10.0/24"), target_port=Port["HTTP"], target_protocol=IPProtocol["TCP"]
target_ip_address=IPv4Network("192.168.10.0/24"),
target_port=PORT_LOOKUP["HTTP"],
target_protocol=PROTOCOL_LOOKUP["TCP"],
)
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol["TCP"]: [Port["HTTP"]]}}
expected_result = {IPv4Address("192.168.10.22"): {PROTOCOL_LOOKUP["TCP"]: [PORT_LOOKUP["HTTP"]]}}
assert sort_dict(actual_result) == sort_dict(expected_result)

View File

@@ -6,19 +6,19 @@ from pydantic import Field
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from tests import TEST_ASSETS_ROOT
class _DatabaseListener(Service):
name: str = "DatabaseListener"
protocol: str = IPProtocol["TCP"]
port: int = Port["NONE"]
listen_on_ports: Set[int] = {Port["POSTGRES_SERVER"]}
protocol: str = PROTOCOL_LOOKUP["TCP"]
port: int = PORT_LOOKUP["NONE"]
listen_on_ports: Set[int] = {PORT_LOOKUP["POSTGRES_SERVER"]}
payloads_received: List[Any] = Field(default_factory=list)
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
@@ -51,8 +51,8 @@ def test_http_listener(client_server):
computer.session_manager.receive_payload_from_software_manager(
payload="masquerade as Database traffic",
dst_ip_address=server.network_interface[1].ip_address,
dst_port=Port["POSTGRES_SERVER"],
ip_protocol=IPProtocol["TCP"],
dst_port=PORT_LOOKUP["POSTGRES_SERVER"],
ip_protocol=PROTOCOL_LOOKUP["TCP"],
)
assert len(server_db_listener.payloads_received) == 1
@@ -76,9 +76,9 @@ def test_set_listen_on_ports_from_config():
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
client: Computer = network.get_node_by_hostname("client")
assert Port["SMB"] in client.software_manager.get_open_ports()
assert Port["IPP"] in client.software_manager.get_open_ports()
assert PORT_LOOKUP["SMB"] in client.software_manager.get_open_ports()
assert PORT_LOOKUP["IPP"] in client.software_manager.get_open_ports()
web_browser = client.software_manager.software["WebBrowser"]
assert not web_browser.listen_on_ports.difference({Port["SMB"], Port["IPP"]})
assert not web_browser.listen_on_ports.difference({PORT_LOOKUP["SMB"], PORT_LOOKUP["IPP"]})

View File

@@ -9,7 +9,7 @@ from primaite.simulator.network.hardware.base import Link
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.database.database_service import DatabaseService
@@ -24,17 +24,22 @@ def web_client_web_server_database(example_network) -> Tuple[Network, Computer,
# add rules to network router
router_1: Router = example_network.get_node_by_hostname("router_1")
router_1.acl.add_rule(
action=ACLAction.PERMIT, src_port=Port["POSTGRES_SERVER"], dst_port=Port["POSTGRES_SERVER"], position=0
action=ACLAction.PERMIT,
src_port=PORT_LOOKUP["POSTGRES_SERVER"],
dst_port=PORT_LOOKUP["POSTGRES_SERVER"],
position=0,
)
# Allow DNS requests
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port["DNS"], dst_port=Port["DNS"], position=1)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["DNS"], dst_port=PORT_LOOKUP["DNS"], position=1)
# Allow FTP requests
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port["FTP"], dst_port=Port["FTP"], position=2)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=PORT_LOOKUP["FTP"], dst_port=PORT_LOOKUP["FTP"], position=2)
# Open port 80 for web server
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=3)
router_1.acl.add_rule(
action=ACLAction.PERMIT, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=3
)
# Create Computer
computer: Computer = example_network.get_node_by_hostname("client_1")
@@ -148,7 +153,9 @@ class TestWebBrowserHistory:
assert web_browser.history[-1].response_code == 200
router = network.get_node_by_hostname("router_1")
router.acl.add_rule(action=ACLAction.DENY, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=0)
router.acl.add_rule(
action=ACLAction.DENY, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=0
)
assert not web_browser.get_webpage()
assert len(web_browser.history) == 3
# with current NIC behaviour, even if you block communication, you won't get SERVER_UNREACHABLE because
@@ -166,7 +173,9 @@ class TestWebBrowserHistory:
web_browser.get_webpage()
router = network.get_node_by_hostname("router_1")
router.acl.add_rule(action=ACLAction.DENY, src_port=Port["HTTP"], dst_port=Port["HTTP"], position=0)
router.acl.add_rule(
action=ACLAction.DENY, src_port=PORT_LOOKUP["HTTP"], dst_port=PORT_LOOKUP["HTTP"], position=0
)
web_browser.get_webpage()
state = computer.describe_state()