diff --git a/CHANGELOG.md b/CHANGELOG.md index 227cf729..17bf3557 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added support for SQL INSERT command. - Added ability to log each agent's action choices in each step to a JSON file. - Removal of Link bandwidth hardcoding. This can now be configured via the network configuraiton yaml. Will default to 100 if not present. +- Added NMAP application to all host and layer-3 network nodes. ### Bug Fixes diff --git a/docs/source/simulation_components/system/applications/nmap.rst b/docs/source/simulation_components/system/applications/nmap.rst new file mode 100644 index 00000000..9ea0c60e --- /dev/null +++ b/docs/source/simulation_components/system/applications/nmap.rst @@ -0,0 +1,347 @@ +.. only:: comment + + © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK + +.. _NMAP: + +NMAP +==== + +Overview +-------- + +The NMAP application is used to simulate network scanning activities. NMAP is a powerful tool that helps in discovering +hosts and services on a network. It provides functionalities such as ping scans to discover active hosts and port scans +to detect open ports on those hosts. + +The NMAP application is essential for network administrators and security professionals to map out a network's +structure, identify active devices, and find potential vulnerabilities by discovering open ports and running services. +However, it is also a tool frequently used by attackers during the reconnaissance stage of a cyber attack to gather +information about the target network. + +Scan Types +---------- + +Ping Scan +^^^^^^^^^ + +A ping scan is used to identify which hosts on a network are active and reachable. This is achieved by sending ICMP +Echo Request packets (ping) to the target IP addresses. If a host responds with an ICMP Echo Reply, it is considered +active. Ping scans are useful for quickly mapping out live hosts in a network. + +Port Scan +^^^^^^^^^ + +A port scan is used to detect open ports on a target host or range of hosts. Open ports can indicate running services +that might be exploitable or require securing. Port scans help in understanding the services available on a network and +identifying potential entry points for attacks. There are three types of port scans based on the scope: + +- **Horizontal Port Scan**: This scan targets a specific port across a range of IP addresses. It helps in identifying + which hosts have a particular service running. + +- **Vertical Port Scan**: This scan targets multiple ports on a single IP address. It provides detailed information + about the services running on a specific host. + +- **Box Scan**: This combines both horizontal and vertical scans, targeting multiple ports across multiple IP addresses. + It gives a comprehensive view of the network's service landscape. + +Example Usage +------------- + +The network we use for these examples is defined below: + +.. code-block:: python + + from ipaddress import IPv4Network + + from primaite.simulator.network.container import Network + from primaite.simulator.network.hardware.nodes.host.computer import Computer + from primaite.simulator.network.hardware.nodes.network.router import Router + from primaite.simulator.network.hardware.nodes.network.switch import Switch + from primaite.simulator.system.applications.nmap import NMAP + from primaite.simulator.system.services.database.database_service import DatabaseService + + # Initialize the network + network = Network() + + # Set up the router + router = Router(hostname="router", start_up_duration=0) + router.power_on() + router.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0") + + # Set up PC 1 + pc_1 = Computer( + hostname="pc_1", + ip_address="192.168.1.11", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0 + ) + pc_1.power_on() + + # Set up PC 2 + pc_2 = Computer( + hostname="pc_2", + ip_address="192.168.1.12", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0 + ) + pc_2.power_on() + pc_2.software_manager.install(DatabaseService) + pc_2.software_manager.software["DatabaseService"].start() # start the postgres server + + # Set up PC 3 + pc_3 = Computer( + hostname="pc_3", + ip_address="192.168.1.13", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0 + ) + # Don't power on PC 3 + + # Set up the switch + switch = Switch(hostname="switch", start_up_duration=0) + switch.power_on() + + # Connect devices + network.connect(router.network_interface[1], switch.network_interface[24]) + network.connect(switch.network_interface[1], pc_1.network_interface[1]) + network.connect(switch.network_interface[2], pc_2.network_interface[1]) + network.connect(switch.network_interface[3], pc_3.network_interface[1]) + + + pc_1_nmap: NMAP = pc_1.software_manager.software["NMAP"] + + +Ping Scan +^^^^^^^^^ + +Perform a ping scan to find active hosts in the `192.168.1.0/24` subnet: + +.. code-block:: python + :caption: Ping Scan Code + + active_hosts = pc_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24")) + +.. code-block:: python + :caption: Ping Scan Return Value + + [ + IPv4Address('192.168.1.11'), + IPv4Address('192.168.1.12'), + IPv4Address('192.168.1.1') + ] + +.. code-block:: text + :caption: Ping Scan Output + + +-------------------------+ + | pc_1 NMAP Ping Scan | + +--------------+----------+ + | IP Address | Can Ping | + +--------------+----------+ + | 192.168.1.1 | True | + | 192.168.1.11 | True | + | 192.168.1.12 | True | + +--------------+----------+ + +Horizontal Port Scan +^^^^^^^^^^^^^^^^^^^^ + +Perform a horizontal port scan on port 5432 across multiple IP addresses: + +.. code-block:: python + :caption: Horizontal Port Scan Code + + horizontal_scan_results = pc_1_nmap.port_scan( + target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")], + target_port=Port(5432 ) + ) + +.. code-block:: python + :caption: Horizontal Port Scan Return Value + + { + IPv4Address('192.168.1.12'): { + : [ + + ] + } + } + +.. code-block:: text + :caption: Horizontal Port Scan Output + + +--------------------------------------------------+ + | pc_1 NMAP Port Scan (Horizontal) | + +--------------+------+-----------------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+-----------------+----------+ + | 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP | + +--------------+------+-----------------+----------+ + +Vertical Post Scan +^^^^^^^^^^^^^^^^^^ + +Perform a vertical port scan on multiple ports on a single IP address: + +.. code-block:: python + :caption: Vertical Port Scan Code + + vertical_scan_results = pc_1_nmap.port_scan( + target_ip_address=[IPv4Address("192.168.1.12")], + target_port=[Port(21), Port(22), Port(80), Port(443)] + ) + +.. code-block:: python + :caption: Vertical Port Scan Return Value + + { + IPv4Address('192.168.1.12'): { + : [ + , + + ] + } + } + +.. code-block:: text + :caption: Vertical Port Scan Output + + +---------------------------------------+ + | pc_1 NMAP Port Scan (Vertical) | + +--------------+------+------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+------+----------+ + | 192.168.1.12 | 21 | FTP | TCP | + | 192.168.1.12 | 80 | HTTP | TCP | + +--------------+------+------+----------+ + +Box Scan +^^^^^^^^ + +Perform a box scan on multiple ports across multiple IP addresses: + +.. code-block:: python + :caption: Box Port Scan Code + + # Power PC 3 on before performing the box scan + pc_3.power_on() + + + box_scan_results = pc_1_nmap.port_scan( + target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")], + target_port=[Port(21), Port(22), Port(80), Port(443)] + ) + +.. code-block:: python + :caption: Box Port Scan Return Value + + { + IPv4Address('192.168.1.13'): { + : [ + , + + ] + }, + IPv4Address('192.168.1.12'): { + : [ + , + + ] + } + } + +.. code-block:: text + :caption: Box Port Scan Output + + +---------------------------------------+ + | pc_1 NMAP Port Scan (Box) | + +--------------+------+------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+------+----------+ + | 192.168.1.12 | 21 | FTP | TCP | + | 192.168.1.12 | 80 | HTTP | TCP | + | 192.168.1.13 | 21 | FTP | TCP | + | 192.168.1.13 | 80 | HTTP | TCP | + +--------------+------+------+----------+ + +Full Box Scan +^^^^^^^^^^^^^ + +Perform a full box scan on all ports, over both TCP and UDP, on a whole subnet: + +.. code-block:: python + :caption: Box Port Scan Code + + # Power PC 3 on before performing the full box scan + pc_3.power_on() + + + full_box_scan_results = pc_1_nmap.port_scan( + target_ip_address=IPv4Network("192.168.1.0/24"), + ) + +.. code-block:: python + :caption: Box Port Scan Return Value + + { + IPv4Address('192.168.1.11'): { + : [ + + ] + }, + IPv4Address('192.168.1.1'): { + : [ + + ] + }, + IPv4Address('192.168.1.12'): { + : [ + , + , + , + + ], + : [ + , + + ] + }, + IPv4Address('192.168.1.13'): { + : [ + , + , + + ], + : [ + , + + ] + } + } + +.. code-block:: text + :caption: Box Port Scan Output + + +--------------------------------------------------+ + | pc_1 NMAP Port Scan (Box) | + +--------------+------+-----------------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+-----------------+----------+ + | 192.168.1.1 | 219 | ARP | UDP | + | 192.168.1.11 | 219 | ARP | UDP | + | 192.168.1.12 | 21 | FTP | TCP | + | 192.168.1.12 | 53 | DNS | TCP | + | 192.168.1.12 | 80 | HTTP | TCP | + | 192.168.1.12 | 123 | NTP | UDP | + | 192.168.1.12 | 219 | ARP | UDP | + | 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP | + | 192.168.1.13 | 21 | FTP | TCP | + | 192.168.1.13 | 53 | DNS | TCP | + | 192.168.1.13 | 80 | HTTP | TCP | + | 192.168.1.13 | 123 | NTP | UDP | + | 192.168.1.13 | 219 | ARP | UDP | + +--------------+------+-----------------+----------+ diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index 7707df2b..a3e61296 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -10,7 +10,7 @@ AbstractAction. The ActionManager is responsible for: """ import itertools from abc import ABC, abstractmethod -from typing import Dict, List, Optional, Tuple, TYPE_CHECKING +from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, Union from gymnasium import spaces @@ -870,6 +870,74 @@ class NetworkPortDisableAction(AbstractAction): return ["network", "node", target_nodename, "network_interface", port_id, "disable"] +class NodeNMAPPingScanAction(AbstractAction): + """Action which performs an NMAP ping scan.""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request(self, source_node: str, target_ip_address: Union[str, List[str]]) -> List[str]: # noqa + return [ + "network", + "node", + source_node, + "application", + "NMAP", + "ping_scan", + {"target_ip_address": target_ip_address}, + ] + + +class NodeNMAPPortScanAction(AbstractAction): + """Action which performs an NMAP port scan.""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request( + self, + source_node: str, + target_ip_address: Union[str, List[str]], + target_protocol: Optional[Union[str, List[str]]] = None, + target_port: Optional[Union[str, List[str]]] = None, + ) -> List[str]: # noqa + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + return [ + "network", + "node", + source_node, + "application", + "NMAP", + "port_scan", + {"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol}, + ] + + +class NodeNetworkServiceReconAction(AbstractAction): + """Action which performs an NMAP network service recon (ping scan followed by port scan).""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request( + self, + source_node: str, + target_ip_address: Union[str, List[str]], + target_protocol: Optional[Union[str, List[str]]] = None, + target_port: Optional[Union[str, List[str]]] = None, + ) -> List[str]: # noqa + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + return [ + "network", + "node", + source_node, + "application", + "NMAP", + "network_service_recon", + {"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol}, + ] + + class ActionManager: """Class which manages the action space for an agent.""" @@ -915,6 +983,9 @@ class ActionManager: "HOST_NIC_DISABLE": HostNICDisableAction, "NETWORK_PORT_ENABLE": NetworkPortEnableAction, "NETWORK_PORT_DISABLE": NetworkPortDisableAction, + "NODE_NMAP_PING_SCAN": NodeNMAPPingScanAction, + "NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction, + "NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction, } """Dictionary which maps action type strings to the corresponding action class.""" diff --git a/src/primaite/interface/request.py b/src/primaite/interface/request.py index bc076599..cacb7cd0 100644 --- a/src/primaite/interface/request.py +++ b/src/primaite/interface/request.py @@ -2,7 +2,7 @@ from typing import Dict, ForwardRef, List, Literal, Union from pydantic import BaseModel, ConfigDict, StrictBool, validate_call -RequestFormat = List[Union[str, int, float]] +RequestFormat = List[Union[str, int, float, Dict]] RequestResponse = ForwardRef("RequestResponse") """This makes it possible to type-hint RequestResponse.from_bool return type.""" diff --git a/src/primaite/notebooks/Training-an-SB3-Agent.ipynb b/src/primaite/notebooks/Training-an-SB3-Agent.ipynb index 9faf5820..e7e6b65e 100644 --- a/src/primaite/notebooks/Training-an-SB3-Agent.ipynb +++ b/src/primaite/notebooks/Training-an-SB3-Agent.ipynb @@ -163,7 +163,7 @@ ], "metadata": { "kernelspec": { - "display_name": "venv", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index 835f24fe..0e496592 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -221,7 +221,7 @@ class SimComponent(BaseModel): return state @validate_call - def apply_request(self, request: RequestFormat, context: Dict = {}) -> RequestResponse: + def apply_request(self, request: RequestFormat, context: Optional[Dict] = None) -> RequestResponse: """ Apply a request to a simulation component. Request data is passed in as a 'namespaced' list of strings. @@ -239,6 +239,8 @@ class SimComponent(BaseModel): :param: context: Dict containing context for requests :type context: Dict """ + if not context: + context = None if self._request_manager is None: return return self._request_manager(request, context) diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index a515ce58..e9560b91 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -28,6 +28,7 @@ from primaite.simulator.network.nmne import ( NMNE_CAPTURE_KEYWORDS, ) from primaite.simulator.network.transmission.data_link_layer import Frame +from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.system.applications.application import Application from primaite.simulator.system.core.packet_capture import PacketCapture from primaite.simulator.system.core.session_manager import SessionManager @@ -36,6 +37,7 @@ from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.system.processes.process import Process from primaite.simulator.system.services.service import Service from primaite.simulator.system.software import IOSoftware +from primaite.utils.converters import convert_dict_enum_keys_to_enum_values from primaite.utils.validators import IPV4Address IOSoftwareClass = TypeVar("IOSoftwareClass", bound=IOSoftware) @@ -107,10 +109,14 @@ class NetworkInterface(SimComponent, ABC): nmne: Dict = Field(default_factory=lambda: {}) "A dict containing details of the number of malicious network events captured." + traffic: Dict = Field(default_factory=lambda: {}) + "A dict containing details of the inbound and outbound traffic by port and protocol." + def setup_for_episode(self, episode: int): """Reset the original state of the SimComponent.""" super().setup_for_episode(episode=episode) self.nmne = {} + self.traffic = {} if episode and self.pcap and SIM_OUTPUT.save_pcap_logs: self.pcap.current_episode = episode self.pcap.setup_logger() @@ -146,6 +152,7 @@ class NetworkInterface(SimComponent, ABC): ) if CAPTURE_NMNE: state.update({"nmne": {k: v for k, v in self.nmne.items()}}) + state.update({"traffic": convert_dict_enum_keys_to_enum_values(self.traffic)}) return state @abstractmethod @@ -236,6 +243,47 @@ class NetworkInterface(SimComponent, ABC): # Increment a generic counter if keyword capturing is not enabled keyword_level["*"] = keyword_level.get("*", 0) + 1 + def _capture_traffic(self, frame: Frame, inbound: bool = True): + """ + Capture traffic statistics at the Network Interface. + + :param frame: The network frame containing the traffic data. + :type frame: Frame + :param inbound: Flag indicating if the traffic is inbound or outbound. Defaults to True. + :type inbound: bool + """ + # Determine the direction of the traffic + direction = "inbound" if inbound else "outbound" + + # Initialize protocol and port variables + protocol = None + port = None + + # Identify the protocol and port from the frame + if frame.tcp: + protocol = IPProtocol.TCP + port = frame.tcp.dst_port + elif frame.udp: + protocol = IPProtocol.UDP + port = frame.udp.dst_port + elif frame.icmp: + protocol = IPProtocol.ICMP + + # Ensure the protocol is in the capture dict + if protocol not in self.traffic: + self.traffic[protocol] = {} + + # Handle non-ICMP protocols that use ports + if protocol != IPProtocol.ICMP: + if port not in self.traffic[protocol]: + self.traffic[protocol][port] = {"inbound": 0, "outbound": 0} + self.traffic[protocol][port][direction] += frame.size + else: + # Handle ICMP protocol separately (ICMP does not use ports) + if not self.traffic[protocol]: + self.traffic[protocol] = {"inbound": 0, "outbound": 0} + self.traffic[protocol][direction] += frame.size + @abstractmethod def send_frame(self, frame: Frame) -> bool: """ @@ -245,6 +293,7 @@ class NetworkInterface(SimComponent, ABC): :return: A boolean indicating whether the frame was successfully sent. """ self._capture_nmne(frame, inbound=False) + self._capture_traffic(frame, inbound=False) @abstractmethod def receive_frame(self, frame: Frame) -> bool: @@ -255,6 +304,7 @@ class NetworkInterface(SimComponent, ABC): :return: A boolean indicating whether the frame was successfully received. """ self._capture_nmne(frame, inbound=True) + self._capture_traffic(frame, inbound=True) def __str__(self) -> str: """ @@ -766,6 +816,24 @@ class Node(SimComponent): self.session_manager.software_manager = self.software_manager self._install_system_software() + def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool: + """ + Checks if a given IP address belongs to any of the nodes interfaces. + + :param ip_address: The IP address to check. + :param enabled_only: If True, only considers enabled network interfaces. + :return: True if the IP address is assigned to one of the nodes interfaces; False otherwise. + """ + for network_interface in self.network_interface.values(): + if not hasattr(network_interface, "ip_address"): + continue + if network_interface.ip_address == ip_address: + if enabled_only: + return network_interface.enabled + else: + return True + return False + def setup_for_episode(self, episode: int): """Reset the original state of the SimComponent.""" super().setup_for_episode(episode=episode) diff --git a/src/primaite/simulator/network/hardware/nodes/host/host_node.py b/src/primaite/simulator/network/hardware/nodes/host/host_node.py index caea2dd7..1ea8366c 100644 --- a/src/primaite/simulator/network/hardware/nodes/host/host_node.py +++ b/src/primaite/simulator/network/hardware/nodes/host/host_node.py @@ -7,6 +7,8 @@ from primaite import getLogger from primaite.simulator.network.hardware.base import IPWiredNetworkInterface, Link, Node from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState from primaite.simulator.network.transmission.data_link_layer import Frame +from primaite.simulator.system.applications.application import ApplicationOperatingState +from primaite.simulator.system.applications.nmap import NMAP from primaite.simulator.system.applications.web_browser import WebBrowser from primaite.simulator.system.services.arp.arp import ARP, ARPPacket from primaite.simulator.system.services.dns.dns_client import DNSClient @@ -302,6 +304,7 @@ class HostNode(Node): "DNSClient": DNSClient, "NTPClient": NTPClient, "WebBrowser": WebBrowser, + "NMAP": NMAP, } """List of system software that is automatically installed on nodes.""" @@ -314,6 +317,16 @@ class HostNode(Node): super().__init__(**kwargs) self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) + @property + def nmap(self) -> Optional[NMAP]: + """ + Return the NMAP application installed on the Node. + + :return: NMAP application installed on the Node. + :rtype: Optional[NMAP] + """ + return self.software_manager.software.get("NMAP") + @property def arp(self) -> Optional[ARP]: """ @@ -365,8 +378,15 @@ class HostNode(Node): elif frame.udp: dst_port = frame.udp.dst_port + can_accept_nmap = False + if self.software_manager.software.get("NMAP"): + if self.software_manager.software["NMAP"].operating_state == ApplicationOperatingState.RUNNING: + can_accept_nmap = True + + accept_nmap = can_accept_nmap and frame.payload.__class__.__name__ == "PortScanPayload" + accept_frame = False - if frame.icmp or dst_port in self.software_manager.get_open_ports(): + if frame.icmp or dst_port in self.software_manager.get_open_ports() or accept_nmap: # accept the frame as the port is open or if it's an ICMP frame accept_frame = True diff --git a/src/primaite/simulator/network/hardware/nodes/network/router.py b/src/primaite/simulator/network/hardware/nodes/network/router.py index 53bb4827..11ed1c76 100644 --- a/src/primaite/simulator/network/hardware/nodes/network/router.py +++ b/src/primaite/simulator/network/hardware/nodes/network/router.py @@ -18,6 +18,7 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType from primaite.simulator.network.transmission.data_link_layer import Frame from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.nmap import NMAP from primaite.simulator.system.core.session_manager import SessionManager from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.system.services.arp.arp import ARP @@ -1238,6 +1239,7 @@ class Router(NetworkNode): icmp.router = self self.software_manager.install(RouterARP) self.arp.router = self + self.software_manager.install(NMAP) def _set_default_acl(self): """ diff --git a/src/primaite/simulator/system/applications/nmap.py b/src/primaite/simulator/system/applications/nmap.py new file mode 100644 index 00000000..80c45553 --- /dev/null +++ b/src/primaite/simulator/system/applications/nmap.py @@ -0,0 +1,451 @@ +from ipaddress import IPv4Address, IPv4Network +from typing import Any, Dict, Final, List, Optional, Set, Tuple, Union + +from prettytable import PrettyTable +from pydantic import validate_call + +from primaite.interface.request import RequestResponse +from primaite.simulator.core import RequestManager, RequestType, SimComponent +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.application import Application +from primaite.utils.validators import IPV4Address + + +class PortScanPayload(SimComponent): + """ + A class representing the payload for a port scan. + + :ivar ip_address: The target IP address for the port scan. + :ivar port: The target port for the port scan. + :ivar protocol: The protocol used for the port scan. + :ivar request:Flag to indicate whether this is a request or not. + """ + + ip_address: IPV4Address + port: Port + protocol: IPProtocol + request: bool = True + + def describe_state(self) -> Dict: + """ + Describe the state of the port scan payload. + + :return: A dictionary representation of the port scan payload state. + :rtype: Dict + """ + state = super().describe_state() + state["ip_address"] = str(self.ip_address) + state["port"] = self.port.value + state["protocol"] = self.protocol.value + state["request"] = self.request + + return state + + +class NMAP(Application): + """ + A class representing the NMAP application for network scanning. + + NMAP is a network scanning tool used to discover hosts and services on a network. It provides functionalities such + as ping scans to discover active hosts and port scans to detect open ports on those hosts. + """ + + _active_port_scans: Dict[str, PortScanPayload] = {} + _port_scan_responses: Dict[str, PortScanPayload] = {} + + _PORT_SCAN_TYPE_MAP: Final[Dict[Tuple[bool, bool], str]] = { + (True, True): "Box", + (True, False): "Horizontal", + (False, True): "Vertical", + (False, False): "Port", + } + + def __init__(self, **kwargs): + kwargs["name"] = "NMAP" + kwargs["port"] = Port.NONE + kwargs["protocol"] = IPProtocol.NONE + super().__init__(**kwargs) + + def _can_perform_network_action(self) -> bool: + """ + Checks if the NMAP application can perform outbound network actions. + + This is done by checking the parent application can_per_action functionality. Then checking if there is an + enabled NIC that can be used for outbound traffic. + + :return: True if outbound network actions can be performed, otherwise False. + """ + if not super()._can_perform_action(): + return False + + for nic in self.software_manager.node.network_interface.values(): + if nic.enabled: + return True + return False + + def _init_request_manager(self) -> RequestManager: + def _ping_scan_action(request: List[Any], context: Any) -> RequestResponse: + results = self.ping_scan(target_ip_address=request[0]["target_ip_address"], json_serializable=True) + if not self._can_perform_network_action(): + return RequestResponse.from_bool(False) + return RequestResponse( + status="success", + data={"live_hosts": results}, + ) + + def _port_scan_action(request: List[Any], context: Any) -> RequestResponse: + results = self.port_scan(**request[0], json_serializable=True) + if not self._can_perform_network_action(): + return RequestResponse.from_bool(False) + return RequestResponse( + status="success", + data=results, + ) + + def _network_service_recon_action(request: List[Any], context: Any) -> RequestResponse: + results = self.network_service_recon(**request[0], json_serializable=True) + if not self._can_perform_network_action(): + return RequestResponse.from_bool(False) + return RequestResponse( + status="success", + data=results, + ) + + rm = RequestManager() + + rm.add_request( + name="ping_scan", + request_type=RequestType(func=_ping_scan_action), + ) + + rm.add_request( + name="port_scan", + request_type=RequestType(func=_port_scan_action), + ) + + rm.add_request( + name="network_service_recon", + request_type=RequestType(func=_network_service_recon_action), + ) + + return rm + + def describe_state(self) -> Dict: + """ + Describe the state of the NMAP application. + + :return: A dictionary representation of the NMAP application's state. + :rtype: Dict + """ + return super().describe_state() + + @staticmethod + def _explode_ip_address_network_array( + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + ) -> Set[IPv4Address]: + """ + Explode a mixed array of IP addresses and networks into a set of individual IP addresses. + + This method takes a combination of single and lists of IPv4 addresses and IPv4 networks, expands any networks + into their constituent subnet useable IP addresses, and returns a set of unique IP addresses. Broadcast and + network addresses are excluded from the result. + + :param target_ip_address: A single or list of IPv4 addresses and networks. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :return: A set of unique IPv4 addresses expanded from the input. + :rtype: Set[IPv4Address] + """ + if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network): + target_ip_address = [target_ip_address] + ip_addresses: List[IPV4Address] = [] + for ip_address in target_ip_address: + if isinstance(ip_address, IPv4Network): + ip_addresses += [ + ip + for ip in ip_address.hosts() + if not ip == ip_address.broadcast_address and not ip == ip_address.network_address + ] + else: + ip_addresses.append(ip_address) + return set(ip_addresses) + + @validate_call() + def ping_scan( + self, + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], + show: bool = True, + show_online_only: bool = True, + json_serializable: bool = False, + ) -> Union[List[IPV4Address], List[str]]: + """ + Perform a ping scan on the target IP address(es). + + :param target_ip_address: The target IP address(es) or network(s) for the ping scan. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :param show: Flag indicating whether to display the scan results. Defaults to True. + :type show: bool + :param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True. + :type show_online_only: bool + :param json_serializable: Flag indicating whether the return value should be json serializable. Defaults to + False. + :type json_serializable: bool + + :return: A list of active IP addresses that responded to the ping. + :rtype: Union[List[IPV4Address], List[str]] + """ + active_nodes = [] + if show: + table = PrettyTable(["IP Address", "Can Ping"]) + table.align = "l" + table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan" + + ip_addresses = self._explode_ip_address_network_array(target_ip_address) + + for ip_address in ip_addresses: + # Prevent ping scan on this node + if self.software_manager.node.ip_is_network_interface(ip_address=ip_address): + continue + can_ping = self.software_manager.icmp.ping(ip_address) + if can_ping: + active_nodes.append(ip_address if not json_serializable else str(ip_address)) + if show and (can_ping or not show_online_only): + table.add_row([ip_address, can_ping]) + if show: + print(table.get_string(sortby="IP Address")) + return active_nodes + + def _determine_port_scan_type(self, target_ip_addresses: List[IPV4Address], target_ports: List[Port]) -> str: + """ + Determine the type of port scan based on the number of target IP addresses and ports. + + :param target_ip_addresses: The list of target IP addresses. + :type target_ip_addresses: List[IPV4Address] + :param target_ports: The list of target ports. + :type target_ports: List[Port] + + :return: The type of port scan. + :rtype: str + """ + vertical_scan = len(target_ports) > 1 + horizontal_scan = len(target_ip_addresses) > 1 + + return self._PORT_SCAN_TYPE_MAP[horizontal_scan, vertical_scan] + + def _check_port_open_on_ip_address( + self, + ip_address: IPv4Address, + port: Port, + protocol: IPProtocol, + is_re_attempt: bool = False, + port_scan_uuid: Optional[str] = None, + ) -> bool: + """ + Check if a port is open on a specific IP address. + + :param ip_address: The target IP address. + :type ip_address: IPv4Address + :param port: The target port. + :type port: Port + :param protocol: The protocol used for the port scan. + :type protocol: IPProtocol + :param is_re_attempt: Flag indicating if this is a reattempt. Defaults to False. + :type is_re_attempt: bool + :param port_scan_uuid: The UUID of the port scan payload. Defaults to None. + :type port_scan_uuid: Optional[str] + + :return: True if the port is open, False otherwise. + :rtype: bool + """ + # The recursive base case + if is_re_attempt: + # Return True if a response has been received, otherwise return False + if port_scan_uuid in self._port_scan_responses: + self._port_scan_responses.pop(port_scan_uuid) + return True + return False + + # Send the port scan request + payload = PortScanPayload(ip_address=ip_address, port=port, protocol=protocol) + self._active_port_scans[payload.uuid] = payload + self.sys_log.info( + f"{self.name}: Sending port scan request over {payload.protocol.name} on port {payload.port.value} " + f"({payload.port.name}) to {payload.ip_address}" + ) + self.software_manager.send_payload_to_session_manager( + payload=payload, dest_ip_address=ip_address, src_port=port, dest_port=port, ip_protocol=protocol + ) + + # Recursively call this function with as a reattempt + return self._check_port_open_on_ip_address( + ip_address=ip_address, port=port, protocol=protocol, is_re_attempt=True, port_scan_uuid=payload.uuid + ) + + def _process_port_scan_response(self, payload: PortScanPayload): + """ + Process the response to a port scan request. + + :param payload: The port scan payload received in response. + :type payload: PortScanPayload + """ + if payload.uuid in self._active_port_scans: + self._active_port_scans.pop(payload.uuid) + self._port_scan_responses[payload.uuid] = payload + self.sys_log.info( + f"{self.name}: Received port scan response from {payload.ip_address} on port {payload.port.value} " + f"({payload.port.name}) over {payload.protocol.name}" + ) + + def _process_port_scan_request(self, payload: PortScanPayload, session_id: str) -> None: + """ + Process a port scan request. + + :param payload: The port scan payload received in the request. + :type payload: PortScanPayload + :param session_id: The session ID for the port scan request. + :type session_id: str + """ + if self.software_manager.check_port_is_open(port=payload.port, protocol=payload.protocol): + payload.request = False + self.sys_log.info( + f"{self.name}: Responding to port scan request for port {payload.port.value} " + f"({payload.port.name}) over {payload.protocol.name}", + True, + ) + self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id) + + @validate_call() + def port_scan( + self, + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], + target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None, + target_port: Optional[Union[Port, List[Port]]] = None, + show: bool = True, + json_serializable: bool = False, + ) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]: + """ + Perform a port scan on the target IP address(es). + + :param target_ip_address: The target IP address(es) or network(s) for the port scan. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP. + :type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] + :param target_port: The port(s) to scan. Defaults to None, which includes all valid ports. + :type target_port: Optional[Union[Port, List[Port]]] + :param show: Flag indicating whether to display the scan results. Defaults to True. + :type show: bool + :param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to + False. + :type json_serializable: bool + + :return: A dictionary mapping IP addresses to protocols and lists of open ports. + :rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]] + """ + ip_addresses = self._explode_ip_address_network_array(target_ip_address) + + if isinstance(target_port, Port): + target_port = [target_port] + elif target_port is None: + target_port = [port for port in Port if port not in {Port.NONE, Port.UNUSED}] + + if isinstance(target_protocol, IPProtocol): + target_protocol = [target_protocol] + elif target_protocol is None: + target_protocol = [IPProtocol.TCP, IPProtocol.UDP] + + scan_type = self._determine_port_scan_type(list(ip_addresses), target_port) + active_ports = {} + if show: + table = PrettyTable(["IP Address", "Port", "Name", "Protocol"]) + table.align = "l" + table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})" + self.sys_log.info(f"{self.name}: Starting port scan") + for ip_address in ip_addresses: + # Prevent port scan on this node + if self.software_manager.node.ip_is_network_interface(ip_address=ip_address): + continue + for protocol in target_protocol: + for port in set(target_port): + port_open = self._check_port_open_on_ip_address(ip_address=ip_address, port=port, protocol=protocol) + + if port_open: + table.add_row([ip_address, port.value, port.name, protocol.name]) + _ip_address = ip_address if not json_serializable else str(ip_address) + _protocol = protocol if not json_serializable else protocol.value + _port = port if not json_serializable else port.value + if _ip_address not in active_ports: + active_ports[_ip_address] = dict() + if _protocol not in active_ports[_ip_address]: + active_ports[_ip_address][_protocol] = [] + active_ports[_ip_address][_protocol].append(_port) + + if show: + print(table.get_string(sortby="IP Address")) + + return active_ports + + def network_service_recon( + self, + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], + target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None, + target_port: Optional[Union[Port, List[Port]]] = None, + show: bool = True, + show_online_only: bool = True, + json_serializable: bool = False, + ) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]: + """ + Perform a network service reconnaissance which includes a ping scan followed by a port scan. + + This method combines the functionalities of a ping scan and a port scan to provide a comprehensive + overview of the services on the network. It first identifies active hosts in the target IP range by performing + a ping scan. Once the active hosts are identified, it performs a port scan on these hosts to identify open + ports and running services. This two-step process ensures that the port scan is performed only on live hosts, + optimising the scanning process and providing accurate results. + + :param target_ip_address: The target IP address(es) or network(s) for the port scan. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP. + :type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] + :param target_port: The port(s) to scan. Defaults to None, which includes all valid ports. + :type target_port: Optional[Union[Port, List[Port]]] + :param show: Flag indicating whether to display the scan results. Defaults to True. + :type show: bool + :param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True. + :type show_online_only: bool + :param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to + False. + :type json_serializable: bool + + :return: A dictionary mapping IP addresses to protocols and lists of open ports. + :rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]] + """ + ping_scan_results = self.ping_scan( + target_ip_address=target_ip_address, show=show, show_online_only=show_online_only, json_serializable=False + ) + return self.port_scan( + target_ip_address=ping_scan_results, + target_protocol=target_protocol, + target_port=target_port, + show=show, + json_serializable=json_serializable, + ) + + def receive(self, payload: Any, session_id: str, **kwargs) -> bool: + """ + Receive and process a payload. + + :param payload: The payload to be processed. + :type payload: Any + :param session_id: The session ID associated with the payload. + :type session_id: str + + :return: True if the payload was successfully processed, False otherwise. + :rtype: bool + """ + if isinstance(payload, PortScanPayload): + if payload.request: + self._process_port_scan_request(payload=payload, session_id=session_id) + else: + self._process_port_scan_response(payload=payload) + + return True diff --git a/src/primaite/simulator/system/core/software_manager.py b/src/primaite/simulator/system/core/software_manager.py index 0487cb7b..dad60adb 100644 --- a/src/primaite/simulator/system/core/software_manager.py +++ b/src/primaite/simulator/system/core/software_manager.py @@ -78,6 +78,31 @@ class SoftwareManager: open_ports.append(software.port) return open_ports + def check_port_is_open(self, port: Port, protocol: IPProtocol) -> bool: + """ + Check if a specific port is open and running a service using the specified protocol. + + This method iterates through all installed software on the node and checks if any of them + are using the specified port and protocol and are currently in a running state. It returns True if any software + is found running on the specified port and protocol, otherwise False. + + + :param port: The port to check. + :type port: Port + :param protocol: The protocol to check (e.g., TCP, UDP). + :type protocol: IPProtocol + :return: True if the port is open and a service is running on it using the specified protocol, False otherwise. + :rtype: bool + """ + for software in self.software.values(): + if ( + software.port == port + and software.protocol == protocol + and software.operating_state in {ApplicationOperatingState.RUNNING, ServiceOperatingState.RUNNING} + ): + return True + return False + def install(self, software_class: Type[IOSoftwareClass]): """ Install an Application or Service. @@ -150,6 +175,7 @@ class SoftwareManager: self, payload: Any, dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, + src_port: Optional[Port] = None, dest_port: Optional[Port] = None, ip_protocol: IPProtocol = IPProtocol.TCP, session_id: Optional[str] = None, @@ -170,6 +196,7 @@ class SoftwareManager: return self.session_manager.receive_payload_from_software_manager( payload=payload, dst_ip_address=dest_ip_address, + src_port=src_port, dst_port=dest_port, ip_protocol=ip_protocol, session_id=session_id, @@ -190,6 +217,9 @@ class SoftwareManager: :param payload: The payload being received. :param session: The transport session the payload originates from. """ + if payload.__class__.__name__ == "PortScanPayload": + self.software.get("NMAP").receive(payload=payload, session_id=session_id) + return receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None) if receiver: receiver.receive( diff --git a/src/primaite/utils/converters.py b/src/primaite/utils/converters.py new file mode 100644 index 00000000..506ef31f --- /dev/null +++ b/src/primaite/utils/converters.py @@ -0,0 +1,25 @@ +from enum import Enum +from typing import Any, Dict + + +def convert_dict_enum_keys_to_enum_values(d: Dict[Any, Any]) -> Dict[Any, Any]: + """ + Convert dictionary keys from enums to their corresponding values. + + :param d: dict + The dictionary with enum keys to be converted. + :return: dict + The dictionary with enum values as keys. + """ + result = {} + for key, value in d.items(): + if isinstance(key, Enum): + new_key = key.value + else: + new_key = key + + if isinstance(value, dict): + result[new_key] = convert_dict_enum_keys_to_enum_values(value) + else: + result[new_key] = value + return result diff --git a/tests/assets/configs/nmap_network_service_recon_red_agent_config.yaml b/tests/assets/configs/nmap_network_service_recon_red_agent_config.yaml new file mode 100644 index 00000000..c5508f13 --- /dev/null +++ b/tests/assets/configs/nmap_network_service_recon_red_agent_config.yaml @@ -0,0 +1,137 @@ +io_settings: + save_step_metadata: false + save_pcap_logs: true + save_sys_logs: true + sys_log_level: WARNING + + + +game: + max_episode_length: 256 + ports: + - ARP + - DNS + - HTTP + - POSTGRES_SERVER + protocols: + - ICMP + - TCP + - UDP + +agents: + - ref: client_1_red_nmap + team: RED + type: ProbabilisticAgent + observation_space: null + action_space: + options: + nodes: + - node_name: client_1 + applications: + - application_name: NMAP + max_folders_per_node: 1 + max_files_per_folder: 1 + max_services_per_node: 1 + max_applications_per_node: 1 + action_list: + - type: NODE_NMAP_NETWORK_SERVICE_RECON + action_map: + 0: + action: NODE_NMAP_NETWORK_SERVICE_RECON + options: + source_node: client_1 + target_ip_address: 192.168.10.0/24 + target_port: 80 + target_protocol: tcp + + reward_function: + reward_components: + - type: DUMMY + + agent_settings: + action_probabilities: + 0: 1.0 + + + +simulation: + network: + nodes: + - hostname: switch_1 + num_ports: 8 + type: switch + + - hostname: switch_2 + num_ports: 8 + type: switch + + - hostname: router_1 + type: router + ports: + 1: + ip_address: 192.168.1.1 + subnet_mask: 255.255.255.0 + 2: + ip_address: 192.168.10.1 + subnet_mask: 255.255.255.0 + acl: + 1: + action: PERMIT + + - hostname: client_1 + type: computer + ip_address: 192.168.10.21 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: client_2 + type: computer + ip_address: 192.168.10.22 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: server_1 + type: server + ip_address: 192.168.1.10 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + - hostname: server_2 + type: server + ip_address: 192.168.1.14 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + + + + links: + - endpoint_a_hostname: router_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 8 + + - endpoint_a_hostname: router_1 + endpoint_a_port: 2 + endpoint_b_hostname: switch_2 + endpoint_b_port: 8 + + - endpoint_a_hostname: client_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 1 + + - endpoint_a_hostname: client_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 2 + + - endpoint_a_hostname: server_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 1 + + - endpoint_a_hostname: server_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 2 diff --git a/tests/assets/configs/nmap_ping_scan_red_agent_config.yaml b/tests/assets/configs/nmap_ping_scan_red_agent_config.yaml new file mode 100644 index 00000000..33ba3d19 --- /dev/null +++ b/tests/assets/configs/nmap_ping_scan_red_agent_config.yaml @@ -0,0 +1,135 @@ +io_settings: + save_step_metadata: false + save_pcap_logs: true + save_sys_logs: true + sys_log_level: WARNING + + + +game: + max_episode_length: 256 + ports: + - ARP + - DNS + - HTTP + - POSTGRES_SERVER + protocols: + - ICMP + - TCP + - UDP + +agents: + - ref: client_1_red_nmap + team: RED + type: ProbabilisticAgent + observation_space: null + action_space: + options: + nodes: + - node_name: client_1 + applications: + - application_name: NMAP + max_folders_per_node: 1 + max_files_per_folder: 1 + max_services_per_node: 1 + max_applications_per_node: 1 + action_list: + - type: NODE_NMAP_PING_SCAN + action_map: + 0: + action: NODE_NMAP_PING_SCAN + options: + source_node: client_1 + target_ip_address: 192.168.1.0/24 + + reward_function: + reward_components: + - type: DUMMY + + agent_settings: + action_probabilities: + 0: 1.0 + + + +simulation: + network: + nodes: + - hostname: switch_1 + num_ports: 8 + type: switch + + - hostname: switch_2 + num_ports: 8 + type: switch + + - hostname: router_1 + type: router + ports: + 1: + ip_address: 192.168.1.1 + subnet_mask: 255.255.255.0 + 2: + ip_address: 192.168.10.1 + subnet_mask: 255.255.255.0 + acl: + 1: + action: PERMIT + + - hostname: client_1 + type: computer + ip_address: 192.168.10.21 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: client_2 + type: computer + ip_address: 192.168.10.22 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: server_1 + type: server + ip_address: 192.168.1.10 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + - hostname: server_2 + type: server + ip_address: 192.168.1.14 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + + + + links: + - endpoint_a_hostname: router_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 8 + + - endpoint_a_hostname: router_1 + endpoint_a_port: 2 + endpoint_b_hostname: switch_2 + endpoint_b_port: 8 + + - endpoint_a_hostname: client_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 1 + + - endpoint_a_hostname: client_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 2 + + - endpoint_a_hostname: server_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 1 + + - endpoint_a_hostname: server_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 2 diff --git a/tests/assets/configs/nmap_port_scan_red_agent_config.yaml b/tests/assets/configs/nmap_port_scan_red_agent_config.yaml new file mode 100644 index 00000000..08944ee5 --- /dev/null +++ b/tests/assets/configs/nmap_port_scan_red_agent_config.yaml @@ -0,0 +1,135 @@ +io_settings: + save_step_metadata: false + save_pcap_logs: true + save_sys_logs: true + sys_log_level: WARNING + + + +game: + max_episode_length: 256 + ports: + - ARP + - DNS + - HTTP + - POSTGRES_SERVER + protocols: + - ICMP + - TCP + - UDP + +agents: + - ref: client_1_red_nmap + team: RED + type: ProbabilisticAgent + observation_space: null + action_space: + options: + nodes: + - node_name: client_1 + applications: + - application_name: NMAP + max_folders_per_node: 1 + max_files_per_folder: 1 + max_services_per_node: 1 + max_applications_per_node: 1 + action_list: + - type: NODE_NMAP_PORT_SCAN + action_map: + 0: + action: NODE_NMAP_PORT_SCAN + options: + source_node: client_1 + target_ip_address: 192.168.10.0/24 + + reward_function: + reward_components: + - type: DUMMY + + agent_settings: + action_probabilities: + 0: 1.0 + + + +simulation: + network: + nodes: + - hostname: switch_1 + num_ports: 8 + type: switch + + - hostname: switch_2 + num_ports: 8 + type: switch + + - hostname: router_1 + type: router + ports: + 1: + ip_address: 192.168.1.1 + subnet_mask: 255.255.255.0 + 2: + ip_address: 192.168.10.1 + subnet_mask: 255.255.255.0 + acl: + 1: + action: PERMIT + + - hostname: client_1 + type: computer + ip_address: 192.168.10.21 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: client_2 + type: computer + ip_address: 192.168.10.22 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: server_1 + type: server + ip_address: 192.168.1.10 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + - hostname: server_2 + type: server + ip_address: 192.168.1.14 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + + + + links: + - endpoint_a_hostname: router_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 8 + + - endpoint_a_hostname: router_1 + endpoint_a_port: 2 + endpoint_b_hostname: switch_2 + endpoint_b_port: 8 + + - endpoint_a_hostname: client_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 1 + + - endpoint_a_hostname: client_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 2 + + - endpoint_a_hostname: server_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 1 + + - endpoint_a_hostname: server_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 2 diff --git a/tests/integration_tests/system/test_nmap.py b/tests/integration_tests/system/test_nmap.py new file mode 100644 index 00000000..7cad6a98 --- /dev/null +++ b/tests/integration_tests/system/test_nmap.py @@ -0,0 +1,185 @@ +from enum import Enum +from ipaddress import IPv4Address, IPv4Network + +import yaml + +from primaite.game.game import PrimaiteGame +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.nmap import NMAP +from tests import TEST_ASSETS_ROOT + + +def test_ping_scan_all_on(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + expected_result = [IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")] + actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"]) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_ping_scan_all_on_full_network(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")] + actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24")) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_ping_scan_some_on(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + network.get_node_by_hostname("server_2").power_off() + + expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10")] + actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24")) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_ping_scan_all_off(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + network.get_node_by_hostname("server_1").power_off() + network.get_node_by_hostname("server_2").power_off() + + expected_result = [] + actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"]) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_port_scan_one_node_one_port(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + client_2 = network.get_node_by_hostname("client_2") + + actual_result = client_1_nmap.port_scan( + target_ip_address=client_2.network_interface[1].ip_address, target_port=Port.DNS, target_protocol=IPProtocol.TCP + ) + + expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.DNS]}} + + assert actual_result == expected_result + + +def sort_dict(d): + """Recursively sorts a dictionary.""" + if isinstance(d, dict): + return {k: sort_dict(v) for k, v in sorted(d.items(), key=lambda item: str(item[0]))} + elif isinstance(d, list): + return sorted(d, key=lambda item: str(item) if isinstance(item, Enum) else item) + elif isinstance(d, Enum): + return str(d) + else: + return d + + +def test_port_scan_full_subnet_all_ports_and_protocols(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + actual_result = client_1_nmap.port_scan( + target_ip_address=IPv4Network("192.168.10.0/24"), + ) + + expected_result = { + IPv4Address("192.168.10.1"): {IPProtocol.UDP: [Port.ARP]}, + IPv4Address("192.168.10.22"): { + IPProtocol.TCP: [Port.HTTP, Port.FTP, Port.DNS], + IPProtocol.UDP: [Port.ARP, Port.NTP], + }, + } + + assert sort_dict(actual_result) == sort_dict(expected_result) + + +def test_network_service_recon_all_ports_and_protocols(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + actual_result = client_1_nmap.network_service_recon( + target_ip_address=IPv4Network("192.168.10.0/24"), target_port=Port.HTTP, target_protocol=IPProtocol.TCP + ) + + expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.HTTP]}} + + assert sort_dict(actual_result) == sort_dict(expected_result) + + +def test_ping_scan_red_agent(): + with open(TEST_ASSETS_ROOT / "configs/nmap_ping_scan_red_agent_config.yaml", "r") as file: + cfg = yaml.safe_load(file) + + game = PrimaiteGame.from_config(cfg) + + game.step() + + expected_result = ["192.168.1.1", "192.168.1.10", "192.168.1.14"] + + action_history = game.agents["client_1_red_nmap"].action_history + assert len(action_history) == 1 + actual_result = action_history[0].response.data["live_hosts"] + + assert sorted(actual_result) == sorted(expected_result) + + +def test_port_scan_red_agent(): + with open(TEST_ASSETS_ROOT / "configs/nmap_port_scan_red_agent_config.yaml", "r") as file: + cfg = yaml.safe_load(file) + + game = PrimaiteGame.from_config(cfg) + + game.step() + + expected_result = { + "192.168.10.1": {"udp": [219]}, + "192.168.10.22": { + "tcp": [80, 21, 53], + "udp": [219, 123], + }, + } + + action_history = game.agents["client_1_red_nmap"].action_history + assert len(action_history) == 1 + actual_result = action_history[0].response.data + + assert sorted(actual_result) == sorted(expected_result) + + +def test_network_service_recon_red_agent(): + with open(TEST_ASSETS_ROOT / "configs/nmap_network_service_recon_red_agent_config.yaml", "r") as file: + cfg = yaml.safe_load(file) + + game = PrimaiteGame.from_config(cfg) + + game.step() + + expected_result = {"192.168.10.22": {"tcp": [80]}} + + action_history = game.agents["client_1_red_nmap"].action_history + assert len(action_history) == 1 + actual_result = action_history[0].response.data + + assert sorted(actual_result) == sorted(expected_result) diff --git a/tests/unit_tests/_primaite/_utils/__init__.py b/tests/unit_tests/_primaite/_utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/_primaite/_utils/test_dict_enum_keys_conversion.py b/tests/unit_tests/_primaite/_utils/test_dict_enum_keys_conversion.py new file mode 100644 index 00000000..3dcc8be5 --- /dev/null +++ b/tests/unit_tests/_primaite/_utils/test_dict_enum_keys_conversion.py @@ -0,0 +1,83 @@ +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.utils.converters import convert_dict_enum_keys_to_enum_values + + +def test_simple_conversion(): + """ + Test conversion of a simple dictionary with enum keys to enum values. + + The original dictionary contains one level of nested dictionary with enums as keys. + The expected output should have string values of enums as keys. + """ + original_dict = {IPProtocol.UDP: {Port.ARP: {"inbound": 0, "outbound": 1016.0}}} + expected_dict = {"udp": {219: {"inbound": 0, "outbound": 1016.0}}} + assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict + + +def test_no_enums(): + """ + Test conversion of a dictionary with no enum keys. + + The original dictionary contains only string keys. + The expected output should be identical to the original dictionary. + """ + original_dict = {"protocol": {"port": {"inbound": 0, "outbound": 1016.0}}} + expected_dict = {"protocol": {"port": {"inbound": 0, "outbound": 1016.0}}} + assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict + + +def test_mixed_keys(): + """ + Test conversion of a dictionary with a mix of enum and string keys. + + The original dictionary contains both enums and strings as keys. + The expected output should have string values of enums and original string keys. + """ + original_dict = { + IPProtocol.TCP: {"port": {"inbound": 0, "outbound": 1016.0}}, + "protocol": {Port.HTTP: {"inbound": 10, "outbound": 2020.0}}, + } + expected_dict = { + "tcp": {"port": {"inbound": 0, "outbound": 1016.0}}, + "protocol": {80: {"inbound": 10, "outbound": 2020.0}}, + } + assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict + + +def test_empty_dict(): + """ + Test conversion of an empty dictionary. + + The original dictionary is empty. + The expected output should also be an empty dictionary. + """ + original_dict = {} + expected_dict = {} + assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict + + +def test_nested_dicts(): + """ + Test conversion of a nested dictionary with multiple levels of nested dictionaries and enums as keys. + + The original dictionary contains nested dictionaries with enums as keys at different levels. + The expected output should have string values of enums as keys at all levels. + """ + original_dict = { + IPProtocol.UDP: {Port.ARP: {"inbound": 0, "outbound": 1016.0, "details": {IPProtocol.TCP: {"latency": "low"}}}} + } + expected_dict = {"udp": {219: {"inbound": 0, "outbound": 1016.0, "details": {"tcp": {"latency": "low"}}}}} + assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict + + +def test_non_dict_values(): + """ + Test conversion of a dictionary where some values are not dictionaries. + + The original dictionary contains lists and tuples as values. + The expected output should preserve these non-dictionary values while converting enum keys to string values. + """ + original_dict = {IPProtocol.UDP: [Port.ARP, Port.HTTP], "protocols": (IPProtocol.TCP, IPProtocol.UDP)} + expected_dict = {"udp": [Port.ARP, Port.HTTP], "protocols": (IPProtocol.TCP, IPProtocol.UDP)} + assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict