#2618 - Integrated the NMAP into the action and requests functionality to enable agent usage. added NMAP agents tests.

This commit is contained in:
Chris McCarthy
2024-05-31 13:53:18 +01:00
parent 9c4d47b0b9
commit 5eea5bf4f9
11 changed files with 763 additions and 26 deletions

View File

@@ -1,6 +1,6 @@
.. only:: comment
© Crown-owned copyright 2023 - 2024, Defence Science and Technology Laboratory UK
© Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
.. _NMAP:
@@ -323,12 +323,11 @@ Perform a full box scan on all ports, over both TCP and UDP, on a whole subnet:
}
}
.. code-block:: text
:caption: Box Port Scan Output
+--------------------------------------------------+
| pc_1 NMAP Port Scan (Box) |
| pc_1 NMAP Port Scan (Box) |
+--------------+------+-----------------+----------+
| IP Address | Port | Name | Protocol |
+--------------+------+-----------------+----------+

View File

@@ -10,7 +10,7 @@ AbstractAction. The ActionManager is responsible for:
"""
import itertools
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, Union
from gymnasium import spaces
@@ -870,6 +870,74 @@ class NetworkPortDisableAction(AbstractAction):
return ["network", "node", target_nodename, "network_interface", port_id, "disable"]
class NodeNMAPPingScanAction(AbstractAction):
"""Action which performs an NMAP ping scan."""
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(self, source_node: str, target_ip_address: Union[str, List[str]]) -> List[str]: # noqa
return [
"network",
"node",
source_node,
"application",
"NMAP",
"ping_scan",
{"target_ip_address": target_ip_address},
]
class NodeNMAPPortScanAction(AbstractAction):
"""Action which performs an NMAP port scan."""
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(
self,
source_node: str,
target_ip_address: Union[str, List[str]],
target_protocol: Optional[Union[str, List[str]]] = None,
target_port: Optional[Union[str, List[str]]] = None,
) -> List[str]: # noqa
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return [
"network",
"node",
source_node,
"application",
"NMAP",
"port_scan",
{"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol},
]
class NodeNetworkServiceReconAction(AbstractAction):
"""Action which performs an NMAP network service recon (ping scan followed by port scan)."""
def __init__(self, manager: "ActionManager", **kwargs) -> None:
super().__init__(manager=manager)
def form_request(
self,
source_node: str,
target_ip_address: Union[str, List[str]],
target_protocol: Optional[Union[str, List[str]]] = None,
target_port: Optional[Union[str, List[str]]] = None,
) -> List[str]: # noqa
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return [
"network",
"node",
source_node,
"application",
"NMAP",
"network_service_recon",
{"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol},
]
class ActionManager:
"""Class which manages the action space for an agent."""
@@ -915,6 +983,9 @@ class ActionManager:
"HOST_NIC_DISABLE": HostNICDisableAction,
"NETWORK_PORT_ENABLE": NetworkPortEnableAction,
"NETWORK_PORT_DISABLE": NetworkPortDisableAction,
"NODE_NMAP_PING_SCAN": NodeNMAPPingScanAction,
"NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction,
"NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction,
}
"""Dictionary which maps action type strings to the corresponding action class."""

View File

@@ -2,7 +2,7 @@ from typing import Dict, ForwardRef, List, Literal, Union
from pydantic import BaseModel, ConfigDict, StrictBool, validate_call
RequestFormat = List[Union[str, int, float]]
RequestFormat = List[Union[str, int, float, Dict]]
RequestResponse = ForwardRef("RequestResponse")
"""This makes it possible to type-hint RequestResponse.from_bool return type."""

View File

@@ -221,7 +221,7 @@ class SimComponent(BaseModel):
return state
@validate_call
def apply_request(self, request: RequestFormat, context: Dict = {}) -> RequestResponse:
def apply_request(self, request: RequestFormat, context: Optional[Dict] = None) -> RequestResponse:
"""
Apply a request to a simulation component. Request data is passed in as a 'namespaced' list of strings.
@@ -239,6 +239,8 @@ class SimComponent(BaseModel):
:param: context: Dict containing context for requests
:type context: Dict
"""
if not context:
context = None
if self._request_manager is None:
return
return self._request_manager(request, context)

View File

@@ -28,6 +28,7 @@ from primaite.simulator.network.nmne import (
NMNE_CAPTURE_KEYWORDS,
)
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.packet_capture import PacketCapture
from primaite.simulator.system.core.session_manager import SessionManager
@@ -107,10 +108,13 @@ class NetworkInterface(SimComponent, ABC):
nmne: Dict = Field(default_factory=lambda: {})
"A dict containing details of the number of malicious network events captured."
traffic: Dict = Field(default_factory=lambda: {})
def setup_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
super().setup_for_episode(episode=episode)
self.nmne = {}
self.traffic = {}
if episode and self.pcap and SIM_OUTPUT.save_pcap_logs:
self.pcap.current_episode = episode
self.pcap.setup_logger()
@@ -146,6 +150,7 @@ class NetworkInterface(SimComponent, ABC):
)
if CAPTURE_NMNE:
state.update({"nmne": {k: v for k, v in self.nmne.items()}})
state.update({"traffic": self.traffic})
return state
@abstractmethod
@@ -236,6 +241,47 @@ class NetworkInterface(SimComponent, ABC):
# Increment a generic counter if keyword capturing is not enabled
keyword_level["*"] = keyword_level.get("*", 0) + 1
def _capture_traffic(self, frame: Frame, inbound: bool = True):
"""
Capture traffic statistics at the Network Interface.
:param frame: The network frame containing the traffic data.
:type frame: Frame
:param inbound: Flag indicating if the traffic is inbound or outbound. Defaults to True.
:type inbound: bool
"""
# Determine the direction of the traffic
direction = "inbound" if inbound else "outbound"
# Initialize protocol and port variables
protocol = None
port = None
# Identify the protocol and port from the frame
if frame.tcp:
protocol = IPProtocol.TCP
port = frame.tcp.dst_port
elif frame.udp:
protocol = IPProtocol.UDP
port = frame.udp.dst_port
elif frame.icmp:
protocol = IPProtocol.ICMP
# Ensure the protocol is in the capture dict
if protocol not in self.traffic:
self.traffic[protocol] = {}
# Handle non-ICMP protocols that use ports
if protocol != IPProtocol.ICMP:
if port not in self.traffic[protocol]:
self.traffic[protocol][port] = {"inbound": 0, "outbound": 0}
self.traffic[protocol][port][direction] += frame.size
else:
# Handle ICMP protocol separately (ICMP does not use ports)
if not self.traffic[protocol]:
self.traffic[protocol] = {"inbound": 0, "outbound": 0}
self.traffic[protocol][direction] += frame.size
@abstractmethod
def send_frame(self, frame: Frame) -> bool:
"""
@@ -245,6 +291,7 @@ class NetworkInterface(SimComponent, ABC):
:return: A boolean indicating whether the frame was successfully sent.
"""
self._capture_nmne(frame, inbound=False)
self._capture_traffic(frame, inbound=False)
@abstractmethod
def receive_frame(self, frame: Frame) -> bool:
@@ -255,6 +302,7 @@ class NetworkInterface(SimComponent, ABC):
:return: A boolean indicating whether the frame was successfully received.
"""
self._capture_nmne(frame, inbound=True)
self._capture_traffic(frame, inbound=True)
def __str__(self) -> str:
"""
@@ -766,6 +814,24 @@ class Node(SimComponent):
self.session_manager.software_manager = self.software_manager
self._install_system_software()
def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool:
"""
Checks if a given IP address belongs to any of the nodes interfaces.
:param ip_address: The IP address to check.
:param enabled_only: If True, only considers enabled network interfaces.
:return: True if the IP address is assigned to one of the nodes interfaces; False otherwise.
"""
for network_interface in self.network_interface.values():
if not hasattr(network_interface, "ip_address"):
continue
if network_interface.ip_address == ip_address:
if enabled_only:
return network_interface.enabled
else:
return True
return False
def setup_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
super().setup_for_episode(episode=episode)

View File

@@ -315,7 +315,17 @@ class HostNode(Node):
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask))
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) #
@property
def nmap(self) -> Optional[NMAP]:
"""
Return the NMAP application installed on the Node.
:return: NMAP application installed on the Node.
:rtype: Optional[NMAP]
"""
return self.software_manager.software.get("NMAP")
@property
def arp(self) -> Optional[ARP]:

View File

@@ -4,7 +4,8 @@ from typing import Any, Dict, Final, List, Optional, Tuple, Union
from prettytable import PrettyTable
from pydantic import validate_call
from primaite.simulator.core import SimComponent
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
@@ -65,6 +66,56 @@ class NMAP(Application):
kwargs["protocol"] = IPProtocol.NONE
super().__init__(**kwargs)
def _init_request_manager(self) -> RequestManager:
def _ping_scan_action(request: List[Any], context: Any) -> RequestResponse:
results = self.ping_scan(target_ip_address=request[0]["target_ip_address"], json_serializable=True)
success = True
if not success:
return RequestResponse.from_bool(False)
return RequestResponse(
status="success",
data={"live_hosts": results},
)
def _port_scan_action(request: List[Any], context: Any) -> RequestResponse:
results = self.port_scan(**request[0], json_serializable=True)
success = True
if not success:
return RequestResponse.from_bool(False)
return RequestResponse(
status="success",
data=results,
)
def _network_service_recon_action(request: List[Any], context: Any) -> RequestResponse:
results = self.network_service_recon(**request[0], json_serializable=True)
success = True
if not success:
return RequestResponse.from_bool(False)
return RequestResponse(
status="success",
data=results,
)
rm = RequestManager()
rm.add_request(
name="ping_scan",
request_type=RequestType(func=_ping_scan_action),
)
rm.add_request(
name="port_scan",
request_type=RequestType(func=_port_scan_action),
)
rm.add_request(
name="network_service_recon",
request_type=RequestType(func=_network_service_recon_action),
)
return rm
def describe_state(self) -> Dict:
"""
Describe the state of the NMAP application.
@@ -80,7 +131,8 @@ class NMAP(Application):
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
show: bool = True,
show_online_only: bool = True,
) -> List[IPV4Address]:
json_serializable: bool = False,
) -> Union[List[IPV4Address], List[str]]:
"""
Perform a ping scan on the target IP address(es).
@@ -90,9 +142,12 @@ class NMAP(Application):
:type show: bool
:param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True.
:type show_online_only: bool
:param json_serializable: Flag indicating whether the return value should be json serializable. Defaults to
False.
:type json_serializable: bool
:return: A list of active IP addresses that responded to the ping.
:rtype: List[IPV4Address]
:rtype: Union[List[IPV4Address], List[str]]
"""
active_nodes = []
if show:
@@ -112,9 +167,12 @@ class NMAP(Application):
else:
ip_addresses.append(ip_address)
for ip_address in set(ip_addresses):
# Prevent ping scan on this node
if self.software_manager.node.ip_is_network_interface(ip_address=ip_address):
continue
can_ping = self.software_manager.icmp.ping(ip_address)
if can_ping:
active_nodes.append(ip_address)
active_nodes.append(ip_address if not json_serializable else str(ip_address))
if show and (can_ping or not show_online_only):
table.add_row([ip_address, can_ping])
if show:
@@ -227,6 +285,7 @@ class NMAP(Application):
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
target_port: Optional[Union[Port, List[Port]]] = None,
show: bool = True,
json_serializable: bool = False,
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
"""
Perform a port scan on the target IP address(es).
@@ -239,6 +298,9 @@ class NMAP(Application):
:type target_port: Optional[Union[Port, List[Port]]]
:param show: Flag indicating whether to display the scan results. Defaults to True.
:type show: bool
:param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to
False.
:type json_serializable: bool
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
@@ -274,24 +336,75 @@ class NMAP(Application):
table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})"
self.sys_log.info(f"{self.name}: Starting port scan")
for ip_address in set(ip_addresses):
# Prevent port scan on this node
if self.software_manager.node.ip_is_network_interface(ip_address=ip_address):
continue
for protocol in target_protocol:
for port in set(target_port):
port_open = self._check_port_open_on_ip_address(ip_address=ip_address, port=port, protocol=protocol)
if port_open:
table.add_row([ip_address, port.value, port.name, protocol.name])
if ip_address not in active_ports:
active_ports[ip_address] = dict()
if protocol not in active_ports[ip_address]:
active_ports[ip_address][protocol] = []
active_ports[ip_address][protocol].append(port)
_ip_address = ip_address if not json_serializable else str(ip_address)
_protocol = protocol if not json_serializable else protocol.value
_port = port if not json_serializable else port.value
if _ip_address not in active_ports:
active_ports[_ip_address] = dict()
if _protocol not in active_ports[_ip_address]:
active_ports[_ip_address][_protocol] = []
active_ports[_ip_address][_protocol].append(_port)
if show:
print(table.get_string(sortby="IP Address"))
return active_ports
def network_service_recon(
self,
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
target_port: Optional[Union[Port, List[Port]]] = None,
show: bool = True,
show_online_only: bool = True,
json_serializable: bool = False,
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
"""
Perform a network service reconnaissance which includes a ping scan followed by a port scan.
This method combines the functionalities of a ping scan and a port scan to provide a comprehensive
overview of the services on the network. It first identifies active hosts in the target IP range by performing
a ping scan. Once the active hosts are identified, it performs a port scan on these hosts to identify open
ports and running services. This two-step process ensures that the port scan is performed only on live hosts,
optimising the scanning process and providing accurate results.
:param target_ip_address: The target IP address(es) or network(s) for the port scan.
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
:param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP.
:type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]]
:param target_port: The port(s) to scan. Defaults to None, which includes all valid ports.
:type target_port: Optional[Union[Port, List[Port]]]
:param show: Flag indicating whether to display the scan results. Defaults to True.
:type show: bool
:param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True.
:type show_online_only: bool
:param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to
False.
:type json_serializable: bool
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
"""
ping_scan_results = self.ping_scan(
target_ip_address=target_ip_address, show=show, show_online_only=show_online_only, json_serializable=False
)
return self.port_scan(
target_ip_address=ping_scan_results,
target_protocol=target_protocol,
target_port=target_port,
show=show,
json_serializable=json_serializable,
)
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
Receive and process a payload.

View File

@@ -0,0 +1,137 @@
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
game:
max_episode_length: 256
ports:
- ARP
- DNS
- HTTP
- POSTGRES_SERVER
protocols:
- ICMP
- TCP
- UDP
agents:
- ref: client_1_red_nmap
team: RED
type: ProbabilisticAgent
observation_space: null
action_space:
options:
nodes:
- node_name: client_1
applications:
- application_name: NMAP
max_folders_per_node: 1
max_files_per_folder: 1
max_services_per_node: 1
max_applications_per_node: 1
action_list:
- type: NODE_NMAP_NETWORK_SERVICE_RECON
action_map:
0:
action: NODE_NMAP_NETWORK_SERVICE_RECON
options:
source_node: client_1
target_ip_address: 192.168.10.0/24
target_port: 80
target_protocol: tcp
reward_function:
reward_components:
- type: DUMMY
agent_settings:
action_probabilities:
0: 1.0
simulation:
network:
nodes:
- hostname: switch_1
num_ports: 8
type: switch
- hostname: switch_2
num_ports: 8
type: switch
- hostname: router_1
type: router
ports:
1:
ip_address: 192.168.1.1
subnet_mask: 255.255.255.0
2:
ip_address: 192.168.10.1
subnet_mask: 255.255.255.0
acl:
1:
action: PERMIT
- hostname: client_1
type: computer
ip_address: 192.168.10.21
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
- hostname: client_2
type: computer
ip_address: 192.168.10.22
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
- hostname: server_1
type: server
ip_address: 192.168.1.10
subnet_mask: 255.255.255.0
default_gateway: 192.168.1.1
- hostname: server_2
type: server
ip_address: 192.168.1.14
subnet_mask: 255.255.255.0
default_gateway: 192.168.1.1
links:
- endpoint_a_hostname: router_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 8
- endpoint_a_hostname: router_1
endpoint_a_port: 2
endpoint_b_hostname: switch_2
endpoint_b_port: 8
- endpoint_a_hostname: client_1
endpoint_a_port: 1
endpoint_b_hostname: switch_2
endpoint_b_port: 1
- endpoint_a_hostname: client_2
endpoint_a_port: 1
endpoint_b_hostname: switch_2
endpoint_b_port: 2
- endpoint_a_hostname: server_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 1
- endpoint_a_hostname: server_2
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 2

View File

@@ -0,0 +1,135 @@
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
game:
max_episode_length: 256
ports:
- ARP
- DNS
- HTTP
- POSTGRES_SERVER
protocols:
- ICMP
- TCP
- UDP
agents:
- ref: client_1_red_nmap
team: RED
type: ProbabilisticAgent
observation_space: null
action_space:
options:
nodes:
- node_name: client_1
applications:
- application_name: NMAP
max_folders_per_node: 1
max_files_per_folder: 1
max_services_per_node: 1
max_applications_per_node: 1
action_list:
- type: NODE_NMAP_PING_SCAN
action_map:
0:
action: NODE_NMAP_PING_SCAN
options:
source_node: client_1
target_ip_address: 192.168.1.0/24
reward_function:
reward_components:
- type: DUMMY
agent_settings:
action_probabilities:
0: 1.0
simulation:
network:
nodes:
- hostname: switch_1
num_ports: 8
type: switch
- hostname: switch_2
num_ports: 8
type: switch
- hostname: router_1
type: router
ports:
1:
ip_address: 192.168.1.1
subnet_mask: 255.255.255.0
2:
ip_address: 192.168.10.1
subnet_mask: 255.255.255.0
acl:
1:
action: PERMIT
- hostname: client_1
type: computer
ip_address: 192.168.10.21
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
- hostname: client_2
type: computer
ip_address: 192.168.10.22
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
- hostname: server_1
type: server
ip_address: 192.168.1.10
subnet_mask: 255.255.255.0
default_gateway: 192.168.1.1
- hostname: server_2
type: server
ip_address: 192.168.1.14
subnet_mask: 255.255.255.0
default_gateway: 192.168.1.1
links:
- endpoint_a_hostname: router_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 8
- endpoint_a_hostname: router_1
endpoint_a_port: 2
endpoint_b_hostname: switch_2
endpoint_b_port: 8
- endpoint_a_hostname: client_1
endpoint_a_port: 1
endpoint_b_hostname: switch_2
endpoint_b_port: 1
- endpoint_a_hostname: client_2
endpoint_a_port: 1
endpoint_b_hostname: switch_2
endpoint_b_port: 2
- endpoint_a_hostname: server_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 1
- endpoint_a_hostname: server_2
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 2

View File

@@ -0,0 +1,135 @@
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
game:
max_episode_length: 256
ports:
- ARP
- DNS
- HTTP
- POSTGRES_SERVER
protocols:
- ICMP
- TCP
- UDP
agents:
- ref: client_1_red_nmap
team: RED
type: ProbabilisticAgent
observation_space: null
action_space:
options:
nodes:
- node_name: client_1
applications:
- application_name: NMAP
max_folders_per_node: 1
max_files_per_folder: 1
max_services_per_node: 1
max_applications_per_node: 1
action_list:
- type: NODE_NMAP_PORT_SCAN
action_map:
0:
action: NODE_NMAP_PORT_SCAN
options:
source_node: client_1
target_ip_address: 192.168.10.0/24
reward_function:
reward_components:
- type: DUMMY
agent_settings:
action_probabilities:
0: 1.0
simulation:
network:
nodes:
- hostname: switch_1
num_ports: 8
type: switch
- hostname: switch_2
num_ports: 8
type: switch
- hostname: router_1
type: router
ports:
1:
ip_address: 192.168.1.1
subnet_mask: 255.255.255.0
2:
ip_address: 192.168.10.1
subnet_mask: 255.255.255.0
acl:
1:
action: PERMIT
- hostname: client_1
type: computer
ip_address: 192.168.10.21
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
- hostname: client_2
type: computer
ip_address: 192.168.10.22
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
- hostname: server_1
type: server
ip_address: 192.168.1.10
subnet_mask: 255.255.255.0
default_gateway: 192.168.1.1
- hostname: server_2
type: server
ip_address: 192.168.1.14
subnet_mask: 255.255.255.0
default_gateway: 192.168.1.1
links:
- endpoint_a_hostname: router_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 8
- endpoint_a_hostname: router_1
endpoint_a_port: 2
endpoint_b_hostname: switch_2
endpoint_b_port: 8
- endpoint_a_hostname: client_1
endpoint_a_port: 1
endpoint_b_hostname: switch_2
endpoint_b_port: 1
- endpoint_a_hostname: client_2
endpoint_a_port: 1
endpoint_b_hostname: switch_2
endpoint_b_port: 2
- endpoint_a_hostname: server_1
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 1
- endpoint_a_hostname: server_2
endpoint_a_port: 1
endpoint_b_hostname: switch_1
endpoint_b_port: 2

View File

@@ -1,16 +1,19 @@
from enum import Enum
from ipaddress import IPv4Address, IPv4Network
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.nmap import NMAP
from tests import TEST_ASSETS_ROOT
def test_ping_scan_all_on(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
expected_result = [IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")]
@@ -23,7 +26,6 @@ def test_ping_scan_all_on_full_network(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")]
@@ -36,13 +38,12 @@ def test_ping_scan_some_on(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
network.get_node_by_hostname("server_2").power_off()
expected_result = [IPv4Address("192.168.1.10")]
actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"])
expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10")]
actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24"))
assert sorted(actual_result) == sorted(expected_result)
@@ -51,7 +52,6 @@ def test_ping_scan_all_off(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
network.get_node_by_hostname("server_1").power_off()
@@ -67,7 +67,6 @@ def test_port_scan_one_node_one_port(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
client_2 = network.get_node_by_hostname("client_2")
@@ -97,7 +96,6 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
actual_result = client_1_nmap.port_scan(
@@ -105,7 +103,6 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
)
expected_result = {
IPv4Address("192.168.10.21"): {IPProtocol.UDP: [Port.ARP]},
IPv4Address("192.168.10.1"): {IPProtocol.UDP: [Port.ARP]},
IPv4Address("192.168.10.22"): {
IPProtocol.TCP: [Port.HTTP, Port.FTP, Port.DNS],
@@ -114,3 +111,75 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
}
assert sort_dict(actual_result) == sort_dict(expected_result)
def test_network_service_recon_all_ports_and_protocols(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
actual_result = client_1_nmap.network_service_recon(
target_ip_address=IPv4Network("192.168.10.0/24"), target_port=Port.HTTP, target_protocol=IPProtocol.TCP
)
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.HTTP]}}
assert sort_dict(actual_result) == sort_dict(expected_result)
def test_ping_scan_red_agent():
with open(TEST_ASSETS_ROOT / "configs/nmap_ping_scan_red_agent_config.yaml", "r") as file:
cfg = yaml.safe_load(file)
game = PrimaiteGame.from_config(cfg)
game.step()
expected_result = ["192.168.1.1", "192.168.1.10", "192.168.1.14"]
action_history = game.agents["client_1_red_nmap"].action_history
assert len(action_history) == 1
actual_result = action_history[0].response.data["live_hosts"]
assert sorted(actual_result) == sorted(expected_result)
def test_port_scan_red_agent():
with open(TEST_ASSETS_ROOT / "configs/nmap_port_scan_red_agent_config.yaml", "r") as file:
cfg = yaml.safe_load(file)
game = PrimaiteGame.from_config(cfg)
game.step()
expected_result = {
"192.168.10.1": {"udp": [219]},
"192.168.10.22": {
"tcp": [80, 21, 53],
"udp": [219, 123],
},
}
action_history = game.agents["client_1_red_nmap"].action_history
assert len(action_history) == 1
actual_result = action_history[0].response.data
assert sorted(actual_result) == sorted(expected_result)
def test_network_service_recon_red_agent():
with open(TEST_ASSETS_ROOT / "configs/nmap_network_service_recon_red_agent_config.yaml", "r") as file:
cfg = yaml.safe_load(file)
game = PrimaiteGame.from_config(cfg)
game.step()
expected_result = {"192.168.10.22": {"tcp": [80]}}
action_history = game.agents["client_1_red_nmap"].action_history
assert len(action_history) == 1
actual_result = action_history[0].response.data
assert sorted(actual_result) == sorted(expected_result)