From 716f3ece1eea17bf99f2ab5403772baec4faac3d Mon Sep 17 00:00:00 2001 From: Chris McCarthy Date: Wed, 29 May 2024 13:13:42 +0100 Subject: [PATCH] #2618 - Added NMAP application, documentation, and tests. --- CHANGELOG.md | 1 + .../system/applications/nmap.rst | 341 ++++++++++++++++++ .../notebooks/Training-an-SB3-Agent.ipynb | 2 +- .../network/hardware/nodes/host/host_node.py | 12 +- .../network/hardware/nodes/network/router.py | 2 + .../simulator/system/applications/nmap.py | 313 ++++++++++++++++ .../simulator/system/core/software_manager.py | 30 ++ tests/integration_tests/system/test_nmap.py | 116 ++++++ 8 files changed, 815 insertions(+), 2 deletions(-) create mode 100644 docs/source/simulation_components/system/applications/nmap.rst create mode 100644 src/primaite/simulator/system/applications/nmap.py create mode 100644 tests/integration_tests/system/test_nmap.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 227cf729..17bf3557 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added support for SQL INSERT command. - Added ability to log each agent's action choices in each step to a JSON file. - Removal of Link bandwidth hardcoding. This can now be configured via the network configuraiton yaml. Will default to 100 if not present. +- Added NMAP application to all host and layer-3 network nodes. ### Bug Fixes diff --git a/docs/source/simulation_components/system/applications/nmap.rst b/docs/source/simulation_components/system/applications/nmap.rst new file mode 100644 index 00000000..272c7b1c --- /dev/null +++ b/docs/source/simulation_components/system/applications/nmap.rst @@ -0,0 +1,341 @@ +.. only:: comment + + © Crown-owned copyright 2023 - 2024, Defence Science and Technology Laboratory UK + +.. _NMAP: + +NMAP +==== + +Overview +-------- + +The `NMAP` is used to simulate network scanning activities. NMAP is a powerful tool that helps in discovering hosts and +services on a network. It provides functionalities such as ping scans to discover active hosts and port scans to detect +open ports on those hosts. + +The NMAP application is essential for network administrators and security professionals to map out a network's +structure, identify active devices, and find potential vulnerabilities by discovering open ports and running services. +However, it is also a tool frequently used by attackers during the reconnaissance stage of a cyber attack to gather +information about the target network. + +Scan Types +---------- + +### Ping Scan + +A ping scan is used to identify which hosts on a network are active and reachable. This is achieved by sending ICMP +Echo Request packets (ping) to the target IP addresses. If a host responds with an ICMP Echo Reply, it is considered +active. Ping scans are useful for quickly mapping out live hosts in a network. + +### Port Scan + +A port scan is used to detect open ports on a target host or range of hosts. Open ports can indicate running services +that might be exploitable or require securing. Port scans help in understanding the services available on a network and +identifying potential entry points for attacks. There are three types of port scans based on the scope: + +- **Horizontal Port Scan**: This scan targets a specific port across a range of IP addresses. It helps in identifying + which hosts have a particular service running. + +- **Vertical Port Scan**: This scan targets multiple ports on a single IP address. It provides detailed information + about the services running on a specific host. + +- **Box Scan**: This combines both horizontal and vertical scans, targeting multiple ports across multiple IP addresses. + It gives a comprehensive view of the network's service landscape. + +Example Usage +------------- + +The network we use for these examples is defined below: + +.. code-block:: python + + from ipaddress import IPv4Network + + from primaite.simulator.network.container import Network + from primaite.simulator.network.hardware.nodes.host.computer import Computer + from primaite.simulator.network.hardware.nodes.network.router import Router + from primaite.simulator.network.hardware.nodes.network.switch import Switch + from primaite.simulator.system.applications.nmap import NMAP + from primaite.simulator.system.services.database.database_service import DatabaseService + + # Initialize the network + network = Network() + + # Set up the router + router = Router(hostname="router", start_up_duration=0) + router.power_on() + router.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0") + + # Set up PC 1 + pc_1 = Computer( + hostname="pc_1", + ip_address="192.168.1.11", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0 + ) + pc_1.power_on() + + # Set up PC 2 + pc_2 = Computer( + hostname="pc_2", + ip_address="192.168.1.12", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0 + ) + pc_2.power_on() + pc_2.software_manager.install(DatabaseService) + pc_2.software_manager.software["DatabaseService"].start() # start the postgres server + + # Set up PC 3 + pc_3 = Computer( + hostname="pc_3", + ip_address="192.168.1.13", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0 + ) + # Don't power on PC 3 + + # Set up the switch + switch = Switch(hostname="switch", start_up_duration=0) + switch.power_on() + + # Connect devices + network.connect(router.network_interface[1], switch.network_interface[24]) + network.connect(switch.network_interface[1], pc_1.network_interface[1]) + network.connect(switch.network_interface[2], pc_2.network_interface[1]) + network.connect(switch.network_interface[3], pc_3.network_interface[1]) + + + pc_1_nmap: NMAP = pc_1.software_manager.software["NMAP"] + + +**Ping Scan** + +Perform a ping scan to find active hosts in the `192.168.1.0/24` subnet: + +.. code-block:: python + :caption: Ping Scan Code + + active_hosts = pc_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24")) + +.. code-block:: python + :caption: Ping Scan Return Value + + [ + IPv4Address('192.168.1.11'), + IPv4Address('192.168.1.12'), + IPv4Address('192.168.1.1') + ] + +.. code-block:: text + :caption: Ping Scan Output + + +-------------------------+ + | pc_1 NMAP Ping Scan | + +--------------+----------+ + | IP Address | Can Ping | + +--------------+----------+ + | 192.168.1.1 | True | + | 192.168.1.11 | True | + | 192.168.1.12 | True | + +--------------+----------+ + +**Horizontal Port Scan** + +Perform a horizontal port scan on port 5432 across multiple IP addresses: + +.. code-block:: python + :caption: Horizontal Port Scan Code + + horizontal_scan_results = pc_1_nmap.port_scan( + target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")], + target_port=Port(5432 ) + ) + +.. code-block:: python + :caption: Horizontal Port Scan Return Value + + { + IPv4Address('192.168.1.12'): { + : [ + + ] + } + } + +.. code-block:: text + :caption: Horizontal Port Scan Output + + +--------------------------------------------------+ + | pc_1 NMAP Port Scan (Horizontal) | + +--------------+------+-----------------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+-----------------+----------+ + | 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP | + +--------------+------+-----------------+----------+ + +**Vertical Post Scan** + +Perform a vertical port scan on multiple ports on a single IP address: + +.. code-block:: python + :caption: Vertical Port Scan Code + + vertical_scan_results = pc_1_nmap.port_scan( + target_ip_address=[IPv4Address("192.168.1.12")], + target_port=[Port(21), Port(22), Port(80), Port(443)] + ) + +.. code-block:: python + :caption: Vertical Port Scan Return Value + + { + IPv4Address('192.168.1.12'): { + : [ + , + + ] + } + } + +.. code-block:: text + :caption: Vertical Port Scan Output + + +---------------------------------------+ + | pc_1 NMAP Port Scan (Vertical) | + +--------------+------+------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+------+----------+ + | 192.168.1.12 | 21 | FTP | TCP | + | 192.168.1.12 | 80 | HTTP | TCP | + +--------------+------+------+----------+ + +**Box Scan** + +Perform a box scan on multiple ports across multiple IP addresses: + +.. code-block:: python + :caption: Box Port Scan Code + + # Power PC 3 on before performing the box scan + pc_3.power_on() + + + box_scan_results = pc_1_nmap.port_scan( + target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")], + target_port=[Port(21), Port(22), Port(80), Port(443)] + ) + +.. code-block:: python + :caption: Box Port Scan Return Value + + { + IPv4Address('192.168.1.13'): { + : [ + , + + ] + }, + IPv4Address('192.168.1.12'): { + : [ + , + + ] + } + } + +.. code-block:: text + :caption: Box Port Scan Output + + +---------------------------------------+ + | pc_1 NMAP Port Scan (Box) | + +--------------+------+------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+------+----------+ + | 192.168.1.12 | 21 | FTP | TCP | + | 192.168.1.12 | 80 | HTTP | TCP | + | 192.168.1.13 | 21 | FTP | TCP | + | 192.168.1.13 | 80 | HTTP | TCP | + +--------------+------+------+----------+ + +**Full Box Scan** + +Perform a full box scan on all ports, over both TCP and UDP, on a whole subnet: + +.. code-block:: python + :caption: Box Port Scan Code + + # Power PC 3 on before performing the full box scan + pc_3.power_on() + + + full_box_scan_results = pc_1_nmap.port_scan( + target_ip_address=IPv4Network("192.168.1.0/24"), + ) + +.. code-block:: python + :caption: Box Port Scan Return Value + + { + IPv4Address('192.168.1.11'): { + : [ + + ] + }, + IPv4Address('192.168.1.1'): { + : [ + + ] + }, + IPv4Address('192.168.1.12'): { + : [ + , + , + , + + ], + : [ + , + + ] + }, + IPv4Address('192.168.1.13'): { + : [ + , + , + + ], + : [ + , + + ] + } + } + + +.. code-block:: text + :caption: Box Port Scan Output + + +--------------------------------------------------+ + | pc_1 NMAP Port Scan (Vertical) | + +--------------+------+-----------------+----------+ + | IP Address | Port | Name | Protocol | + +--------------+------+-----------------+----------+ + | 192.168.1.1 | 219 | ARP | UDP | + | 192.168.1.11 | 219 | ARP | UDP | + | 192.168.1.12 | 21 | FTP | TCP | + | 192.168.1.12 | 53 | DNS | TCP | + | 192.168.1.12 | 80 | HTTP | TCP | + | 192.168.1.12 | 123 | NTP | UDP | + | 192.168.1.12 | 219 | ARP | UDP | + | 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP | + | 192.168.1.13 | 21 | FTP | TCP | + | 192.168.1.13 | 53 | DNS | TCP | + | 192.168.1.13 | 80 | HTTP | TCP | + | 192.168.1.13 | 123 | NTP | UDP | + | 192.168.1.13 | 219 | ARP | UDP | + +--------------+------+-----------------+----------+ diff --git a/src/primaite/notebooks/Training-an-SB3-Agent.ipynb b/src/primaite/notebooks/Training-an-SB3-Agent.ipynb index 9faf5820..e7e6b65e 100644 --- a/src/primaite/notebooks/Training-an-SB3-Agent.ipynb +++ b/src/primaite/notebooks/Training-an-SB3-Agent.ipynb @@ -163,7 +163,7 @@ ], "metadata": { "kernelspec": { - "display_name": "venv", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/src/primaite/simulator/network/hardware/nodes/host/host_node.py b/src/primaite/simulator/network/hardware/nodes/host/host_node.py index caea2dd7..25c65dfd 100644 --- a/src/primaite/simulator/network/hardware/nodes/host/host_node.py +++ b/src/primaite/simulator/network/hardware/nodes/host/host_node.py @@ -7,6 +7,8 @@ from primaite import getLogger from primaite.simulator.network.hardware.base import IPWiredNetworkInterface, Link, Node from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState from primaite.simulator.network.transmission.data_link_layer import Frame +from primaite.simulator.system.applications.application import ApplicationOperatingState +from primaite.simulator.system.applications.nmap import NMAP from primaite.simulator.system.applications.web_browser import WebBrowser from primaite.simulator.system.services.arp.arp import ARP, ARPPacket from primaite.simulator.system.services.dns.dns_client import DNSClient @@ -302,6 +304,7 @@ class HostNode(Node): "DNSClient": DNSClient, "NTPClient": NTPClient, "WebBrowser": WebBrowser, + "NMAP": NMAP, } """List of system software that is automatically installed on nodes.""" @@ -365,8 +368,15 @@ class HostNode(Node): elif frame.udp: dst_port = frame.udp.dst_port + can_accept_nmap = False + if self.software_manager.software.get("NMAP"): + if self.software_manager.software["NMAP"].operating_state == ApplicationOperatingState.RUNNING: + can_accept_nmap = True + + accept_nmap = can_accept_nmap and frame.payload.__class__.__name__ == "PortScanPayload" + accept_frame = False - if frame.icmp or dst_port in self.software_manager.get_open_ports(): + if frame.icmp or dst_port in self.software_manager.get_open_ports() or accept_nmap: # accept the frame as the port is open or if it's an ICMP frame accept_frame = True diff --git a/src/primaite/simulator/network/hardware/nodes/network/router.py b/src/primaite/simulator/network/hardware/nodes/network/router.py index 53bb4827..11ed1c76 100644 --- a/src/primaite/simulator/network/hardware/nodes/network/router.py +++ b/src/primaite/simulator/network/hardware/nodes/network/router.py @@ -18,6 +18,7 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType from primaite.simulator.network.transmission.data_link_layer import Frame from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.nmap import NMAP from primaite.simulator.system.core.session_manager import SessionManager from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.system.services.arp.arp import ARP @@ -1238,6 +1239,7 @@ class Router(NetworkNode): icmp.router = self self.software_manager.install(RouterARP) self.arp.router = self + self.software_manager.install(NMAP) def _set_default_acl(self): """ diff --git a/src/primaite/simulator/system/applications/nmap.py b/src/primaite/simulator/system/applications/nmap.py new file mode 100644 index 00000000..f73bf6a6 --- /dev/null +++ b/src/primaite/simulator/system/applications/nmap.py @@ -0,0 +1,313 @@ +from ipaddress import IPv4Address, IPv4Network +from typing import Any, Dict, Final, List, Optional, Tuple, Union + +from prettytable import PrettyTable +from pydantic import validate_call + +from primaite.simulator.core import SimComponent +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.application import Application +from primaite.utils.validators import IPV4Address + + +class PortScanPayload(SimComponent): + """ + A class representing the payload for a port scan. + + :ivar ip_address: The target IP address for the port scan. + :ivar port: The target port for the port scan. + :ivar protocol: The protocol used for the port scan. + """ + + ip_address: IPV4Address + port: Port + protocol: IPProtocol + request: bool = True + + def describe_state(self) -> Dict: + """ + Describe the state of the port scan payload. + + :return: A dictionary representation of the port scan payload state. + :rtype: Dict + """ + state = super().describe_state() + state["ip_address"] = str(self.ip_address) + state["port"] = self.port.value + state["protocol"] = self.protocol.value + state["request"] = self.request + + return state + + +class NMAP(Application): + """ + A class representing the NMAP application for network scanning. + + NMAP is a network scanning tool used to discover hosts and services on a network. It provides functionalities such + as ping scans to discover active hosts and port scans to detect open ports on those hosts. + """ + + _active_port_scans: Dict[str, PortScanPayload] = {} + _port_scan_responses: Dict[str, PortScanPayload] = {} + + _PORT_SCAN_TYPE_MAP: Final[Dict[Tuple[bool, bool], str]] = { + (True, True): "Box", + (True, False): "Horizontal", + (False, True): "Vertical", + (False, False): "Port", + } + + def __init__(self, **kwargs): + kwargs["name"] = "NMAP" + kwargs["port"] = Port.NONE + kwargs["protocol"] = IPProtocol.NONE + super().__init__(**kwargs) + + def describe_state(self) -> Dict: + """ + Describe the state of the NMAP application. + + :return: A dictionary representation of the NMAP application's state. + :rtype: Dict + """ + return super().describe_state() + + @validate_call() + def ping_scan( + self, + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], + show: bool = True, + show_online_only: bool = True, + ) -> List[IPV4Address]: + """ + Perform a ping scan on the target IP address(es). + + :param target_ip_address: The target IP address(es) or network(s) for the ping scan. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :param show: Flag indicating whether to display the scan results. Defaults to True. + :type show: bool + :param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True. + :type show_online_only: bool + + :return: A list of active IP addresses that responded to the ping. + :rtype: List[IPV4Address] + """ + active_nodes = [] + if show: + table = PrettyTable(["IP Address", "Can Ping"]) + table.align = "l" + table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan" + if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network): + target_ip_address = [target_ip_address] + ip_addresses = [] + for ip_address in target_ip_address: + if isinstance(ip_address, IPv4Network): + ip_addresses += [ + ip + for ip in ip_address.hosts() + if not ip == ip_address.broadcast_address and not ip == ip_address.network_address + ] + else: + ip_addresses.append(ip_address) + for ip_address in set(ip_addresses): + can_ping = self.software_manager.icmp.ping(ip_address) + if can_ping: + active_nodes.append(ip_address) + if show and (can_ping or not show_online_only): + table.add_row([ip_address, can_ping]) + if show: + print(table.get_string(sortby="IP Address")) + return active_nodes + + def _determine_port_scan_type(self, target_ip_addresses: List[IPV4Address], target_ports: List[Port]) -> str: + """ + Determine the type of port scan based on the number of target IP addresses and ports. + + :param target_ip_addresses: The list of target IP addresses. + :type target_ip_addresses: List[IPV4Address] + :param target_ports: The list of target ports. + :type target_ports: List[Port] + + :return: The type of port scan. + :rtype: str + """ + vertical_scan = len(target_ports) > 1 + horizontal_scan = len(target_ip_addresses) > 1 + + return self._PORT_SCAN_TYPE_MAP[horizontal_scan, vertical_scan] + + def _check_port_open_on_ip_address( + self, + ip_address: IPv4Address, + port: Port, + protocol: IPProtocol, + is_re_attempt: bool = False, + port_scan_uuid: Optional[str] = None, + ) -> bool: + """ + Check if a port is open on a specific IP address. + + :param ip_address: The target IP address. + :type ip_address: IPv4Address + :param port: The target port. + :type port: Port + :param protocol: The protocol used for the port scan. + :type protocol: IPProtocol + :param is_re_attempt: Flag indicating if this is a reattempt. Defaults to False. + :type is_re_attempt: bool + :param port_scan_uuid: The UUID of the port scan payload. Defaults to None. + :type port_scan_uuid: Optional[str] + + :return: True if the port is open, False otherwise. + :rtype: bool + """ + # The recursive base case + if is_re_attempt: + # Return True if a response has been received, otherwise return False + if port_scan_uuid in self._port_scan_responses: + self._port_scan_responses.pop(port_scan_uuid) + return True + return False + + # Send the port scan request + payload = PortScanPayload(ip_address=ip_address, port=port, protocol=protocol) + self._active_port_scans[payload.uuid] = payload + self.sys_log.info( + f"{self.name}: Sending port scan request over {payload.protocol.name} on port {payload.port.value} " + f"({payload.port.name}) to {payload.ip_address}" + ) + self.software_manager.send_payload_to_session_manager( + payload=payload, dest_ip_address=ip_address, src_port=port, dest_port=port, ip_protocol=protocol + ) + + # Recursively call this function with as a reattempt + return self._check_port_open_on_ip_address( + ip_address=ip_address, port=port, protocol=protocol, is_re_attempt=True, port_scan_uuid=payload.uuid + ) + + def _process_port_scan_response(self, payload: PortScanPayload): + """ + Process the response to a port scan request. + + :param payload: The port scan payload received in response. + :type payload: PortScanPayload + """ + if payload.uuid in self._active_port_scans: + self._active_port_scans.pop(payload.uuid) + self._port_scan_responses[payload.uuid] = payload + self.sys_log.info( + f"{self.name}: Received port scan response from {payload.ip_address} on port {payload.port.value} " + f"({payload.port.name}) over {payload.protocol.name}" + ) + + def _process_port_scan_request(self, payload: PortScanPayload, session_id: str) -> None: + """ + Process a port scan request. + + :param payload: The port scan payload received in the request. + :type payload: PortScanPayload + :param session_id: The session ID for the port scan request. + :type session_id: str + """ + if self.software_manager.check_port_is_open(port=payload.port, protocol=payload.protocol): + payload.request = False + self.sys_log.info( + f"{self.name}: Responding to port scan request for port {payload.port.value} " + f"({payload.port.name}) over {payload.protocol.name}", + True, + ) + self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id) + + @validate_call() + def port_scan( + self, + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], + target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None, + target_port: Optional[Union[Port, List[Port]]] = None, + show: bool = True, + ) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]: + """ + Perform a port scan on the target IP address(es). + + :param target_ip_address: The target IP address(es) or network(s) for the port scan. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP. + :type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] + :param target_port: The port(s) to scan. Defaults to None, which includes all valid ports. + :type target_port: Optional[Union[Port, List[Port]]] + :param show: Flag indicating whether to display the scan results. Defaults to True. + :type show: bool + + :return: A dictionary mapping IP addresses to protocols and lists of open ports. + :rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]] + """ + if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network): + target_ip_address = [target_ip_address] + ip_addresses = [] + for ip_address in target_ip_address: + if isinstance(ip_address, IPv4Network): + ip_addresses += [ + ip + for ip in ip_address.hosts() + if not ip == ip_address.broadcast_address and not ip == ip_address.network_address + ] + else: + ip_addresses.append(ip_address) + + if isinstance(target_port, Port): + target_port = [target_port] + elif target_port is None: + target_port = [port for port in Port if port not in {Port.NONE, Port.UNUSED}] + + if isinstance(target_protocol, IPProtocol): + target_protocol = [target_protocol] + elif target_protocol is None: + target_protocol = [IPProtocol.TCP, IPProtocol.UDP] + + scan_type = self._determine_port_scan_type(target_ip_address, target_port) + active_ports = {} + if show: + table = PrettyTable(["IP Address", "Port", "Name", "Protocol"]) + table.align = "l" + table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})" + self.sys_log.info(f"{self.name}: Starting port scan") + for ip_address in set(ip_addresses): + for protocol in target_protocol: + for port in set(target_port): + port_open = self._check_port_open_on_ip_address(ip_address=ip_address, port=port, protocol=protocol) + + if port_open: + table.add_row([ip_address, port.value, port.name, protocol.name]) + + if ip_address not in active_ports: + active_ports[ip_address] = dict() + if protocol not in active_ports[ip_address]: + active_ports[ip_address][protocol] = [] + active_ports[ip_address][protocol].append(port) + + if show: + print(table.get_string(sortby="IP Address")) + + return active_ports + + def receive(self, payload: Any, session_id: str, **kwargs) -> bool: + """ + Receive and process a payload. + + :param payload: The payload to be processed. + :type payload: Any + :param session_id: The session ID associated with the payload. + :type session_id: str + + :return: True if the payload was successfully processed, False otherwise. + :rtype: bool + """ + if isinstance(payload, PortScanPayload): + if payload.request: + self._process_port_scan_request(payload=payload, session_id=session_id) + else: + self._process_port_scan_response(payload=payload) + + return True diff --git a/src/primaite/simulator/system/core/software_manager.py b/src/primaite/simulator/system/core/software_manager.py index 0487cb7b..dad60adb 100644 --- a/src/primaite/simulator/system/core/software_manager.py +++ b/src/primaite/simulator/system/core/software_manager.py @@ -78,6 +78,31 @@ class SoftwareManager: open_ports.append(software.port) return open_ports + def check_port_is_open(self, port: Port, protocol: IPProtocol) -> bool: + """ + Check if a specific port is open and running a service using the specified protocol. + + This method iterates through all installed software on the node and checks if any of them + are using the specified port and protocol and are currently in a running state. It returns True if any software + is found running on the specified port and protocol, otherwise False. + + + :param port: The port to check. + :type port: Port + :param protocol: The protocol to check (e.g., TCP, UDP). + :type protocol: IPProtocol + :return: True if the port is open and a service is running on it using the specified protocol, False otherwise. + :rtype: bool + """ + for software in self.software.values(): + if ( + software.port == port + and software.protocol == protocol + and software.operating_state in {ApplicationOperatingState.RUNNING, ServiceOperatingState.RUNNING} + ): + return True + return False + def install(self, software_class: Type[IOSoftwareClass]): """ Install an Application or Service. @@ -150,6 +175,7 @@ class SoftwareManager: self, payload: Any, dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, + src_port: Optional[Port] = None, dest_port: Optional[Port] = None, ip_protocol: IPProtocol = IPProtocol.TCP, session_id: Optional[str] = None, @@ -170,6 +196,7 @@ class SoftwareManager: return self.session_manager.receive_payload_from_software_manager( payload=payload, dst_ip_address=dest_ip_address, + src_port=src_port, dst_port=dest_port, ip_protocol=ip_protocol, session_id=session_id, @@ -190,6 +217,9 @@ class SoftwareManager: :param payload: The payload being received. :param session: The transport session the payload originates from. """ + if payload.__class__.__name__ == "PortScanPayload": + self.software.get("NMAP").receive(payload=payload, session_id=session_id) + return receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None) if receiver: receiver.receive( diff --git a/tests/integration_tests/system/test_nmap.py b/tests/integration_tests/system/test_nmap.py new file mode 100644 index 00000000..39cfce98 --- /dev/null +++ b/tests/integration_tests/system/test_nmap.py @@ -0,0 +1,116 @@ +from enum import Enum +from ipaddress import IPv4Address, IPv4Network + +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.applications.nmap import NMAP + + +def test_ping_scan_all_on(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1.software_manager.install(NMAP) + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + expected_result = [IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")] + actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"]) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_ping_scan_all_on_full_network(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1.software_manager.install(NMAP) + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")] + actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24")) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_ping_scan_some_on(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1.software_manager.install(NMAP) + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + network.get_node_by_hostname("server_2").power_off() + + expected_result = [IPv4Address("192.168.1.10")] + actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"]) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_ping_scan_all_off(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1.software_manager.install(NMAP) + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + network.get_node_by_hostname("server_1").power_off() + network.get_node_by_hostname("server_2").power_off() + + expected_result = [] + actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"]) + + assert sorted(actual_result) == sorted(expected_result) + + +def test_port_scan_one_node_one_port(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1.software_manager.install(NMAP) + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + client_2 = network.get_node_by_hostname("client_2") + + actual_result = client_1_nmap.port_scan( + target_ip_address=client_2.network_interface[1].ip_address, target_port=Port.DNS, target_protocol=IPProtocol.TCP + ) + + expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.DNS]}} + + assert actual_result == expected_result + + +def sort_dict(d): + """Recursively sorts a dictionary.""" + if isinstance(d, dict): + return {k: sort_dict(v) for k, v in sorted(d.items(), key=lambda item: str(item[0]))} + elif isinstance(d, list): + return sorted(d, key=lambda item: str(item) if isinstance(item, Enum) else item) + elif isinstance(d, Enum): + return str(d) + else: + return d + + +def test_port_scan_full_subnet_all_ports_and_protocols(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1.software_manager.install(NMAP) + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + actual_result = client_1_nmap.port_scan( + target_ip_address=IPv4Network("192.168.10.0/24"), + ) + + expected_result = { + IPv4Address("192.168.10.21"): {IPProtocol.UDP: [Port.ARP]}, + IPv4Address("192.168.10.1"): {IPProtocol.UDP: [Port.ARP]}, + IPv4Address("192.168.10.22"): { + IPProtocol.TCP: [Port.HTTP, Port.FTP, Port.DNS], + IPProtocol.UDP: [Port.ARP, Port.NTP], + }, + } + + assert sort_dict(actual_result) == sort_dict(expected_result)