#2618 - Added NMAP application, documentation, and tests.

This commit is contained in:
Chris McCarthy
2024-05-29 13:13:42 +01:00
parent 85f03570f7
commit 716f3ece1e
8 changed files with 815 additions and 2 deletions

View File

@@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support for SQL INSERT command.
- Added ability to log each agent's action choices in each step to a JSON file.
- Removal of Link bandwidth hardcoding. This can now be configured via the network configuraiton yaml. Will default to 100 if not present.
- Added NMAP application to all host and layer-3 network nodes.
### Bug Fixes

View File

@@ -0,0 +1,341 @@
.. only:: comment
© Crown-owned copyright 2023 - 2024, Defence Science and Technology Laboratory UK
.. _NMAP:
NMAP
====
Overview
--------
The `NMAP` is used to simulate network scanning activities. NMAP is a powerful tool that helps in discovering hosts and
services on a network. It provides functionalities such as ping scans to discover active hosts and port scans to detect
open ports on those hosts.
The NMAP application is essential for network administrators and security professionals to map out a network's
structure, identify active devices, and find potential vulnerabilities by discovering open ports and running services.
However, it is also a tool frequently used by attackers during the reconnaissance stage of a cyber attack to gather
information about the target network.
Scan Types
----------
### Ping Scan
A ping scan is used to identify which hosts on a network are active and reachable. This is achieved by sending ICMP
Echo Request packets (ping) to the target IP addresses. If a host responds with an ICMP Echo Reply, it is considered
active. Ping scans are useful for quickly mapping out live hosts in a network.
### Port Scan
A port scan is used to detect open ports on a target host or range of hosts. Open ports can indicate running services
that might be exploitable or require securing. Port scans help in understanding the services available on a network and
identifying potential entry points for attacks. There are three types of port scans based on the scope:
- **Horizontal Port Scan**: This scan targets a specific port across a range of IP addresses. It helps in identifying
which hosts have a particular service running.
- **Vertical Port Scan**: This scan targets multiple ports on a single IP address. It provides detailed information
about the services running on a specific host.
- **Box Scan**: This combines both horizontal and vertical scans, targeting multiple ports across multiple IP addresses.
It gives a comprehensive view of the network's service landscape.
Example Usage
-------------
The network we use for these examples is defined below:
.. code-block:: python
from ipaddress import IPv4Network
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.system.applications.nmap import NMAP
from primaite.simulator.system.services.database.database_service import DatabaseService
# Initialize the network
network = Network()
# Set up the router
router = Router(hostname="router", start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
# Set up PC 1
pc_1 = Computer(
hostname="pc_1",
ip_address="192.168.1.11",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0
)
pc_1.power_on()
# Set up PC 2
pc_2 = Computer(
hostname="pc_2",
ip_address="192.168.1.12",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0
)
pc_2.power_on()
pc_2.software_manager.install(DatabaseService)
pc_2.software_manager.software["DatabaseService"].start() # start the postgres server
# Set up PC 3
pc_3 = Computer(
hostname="pc_3",
ip_address="192.168.1.13",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
start_up_duration=0
)
# Don't power on PC 3
# Set up the switch
switch = Switch(hostname="switch", start_up_duration=0)
switch.power_on()
# Connect devices
network.connect(router.network_interface[1], switch.network_interface[24])
network.connect(switch.network_interface[1], pc_1.network_interface[1])
network.connect(switch.network_interface[2], pc_2.network_interface[1])
network.connect(switch.network_interface[3], pc_3.network_interface[1])
pc_1_nmap: NMAP = pc_1.software_manager.software["NMAP"]
**Ping Scan**
Perform a ping scan to find active hosts in the `192.168.1.0/24` subnet:
.. code-block:: python
:caption: Ping Scan Code
active_hosts = pc_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24"))
.. code-block:: python
:caption: Ping Scan Return Value
[
IPv4Address('192.168.1.11'),
IPv4Address('192.168.1.12'),
IPv4Address('192.168.1.1')
]
.. code-block:: text
:caption: Ping Scan Output
+-------------------------+
| pc_1 NMAP Ping Scan |
+--------------+----------+
| IP Address | Can Ping |
+--------------+----------+
| 192.168.1.1 | True |
| 192.168.1.11 | True |
| 192.168.1.12 | True |
+--------------+----------+
**Horizontal Port Scan**
Perform a horizontal port scan on port 5432 across multiple IP addresses:
.. code-block:: python
:caption: Horizontal Port Scan Code
horizontal_scan_results = pc_1_nmap.port_scan(
target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")],
target_port=Port(5432 )
)
.. code-block:: python
:caption: Horizontal Port Scan Return Value
{
IPv4Address('192.168.1.12'): {
<IPProtocol.TCP: 'tcp'>: [
<Port.POSTGRES_SERVER: 5432>
]
}
}
.. code-block:: text
:caption: Horizontal Port Scan Output
+--------------------------------------------------+
| pc_1 NMAP Port Scan (Horizontal) |
+--------------+------+-----------------+----------+
| IP Address | Port | Name | Protocol |
+--------------+------+-----------------+----------+
| 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP |
+--------------+------+-----------------+----------+
**Vertical Post Scan**
Perform a vertical port scan on multiple ports on a single IP address:
.. code-block:: python
:caption: Vertical Port Scan Code
vertical_scan_results = pc_1_nmap.port_scan(
target_ip_address=[IPv4Address("192.168.1.12")],
target_port=[Port(21), Port(22), Port(80), Port(443)]
)
.. code-block:: python
:caption: Vertical Port Scan Return Value
{
IPv4Address('192.168.1.12'): {
<IPProtocol.TCP: 'tcp'>: [
<Port.FTP: 21>,
<Port.HTTP: 80>
]
}
}
.. code-block:: text
:caption: Vertical Port Scan Output
+---------------------------------------+
| pc_1 NMAP Port Scan (Vertical) |
+--------------+------+------+----------+
| IP Address | Port | Name | Protocol |
+--------------+------+------+----------+
| 192.168.1.12 | 21 | FTP | TCP |
| 192.168.1.12 | 80 | HTTP | TCP |
+--------------+------+------+----------+
**Box Scan**
Perform a box scan on multiple ports across multiple IP addresses:
.. code-block:: python
:caption: Box Port Scan Code
# Power PC 3 on before performing the box scan
pc_3.power_on()
box_scan_results = pc_1_nmap.port_scan(
target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")],
target_port=[Port(21), Port(22), Port(80), Port(443)]
)
.. code-block:: python
:caption: Box Port Scan Return Value
{
IPv4Address('192.168.1.13'): {
<IPProtocol.TCP: 'tcp'>: [
<Port.FTP: 21>,
<Port.HTTP: 80>
]
},
IPv4Address('192.168.1.12'): {
<IPProtocol.TCP: 'tcp'>: [
<Port.FTP: 21>,
<Port.HTTP: 80>
]
}
}
.. code-block:: text
:caption: Box Port Scan Output
+---------------------------------------+
| pc_1 NMAP Port Scan (Box) |
+--------------+------+------+----------+
| IP Address | Port | Name | Protocol |
+--------------+------+------+----------+
| 192.168.1.12 | 21 | FTP | TCP |
| 192.168.1.12 | 80 | HTTP | TCP |
| 192.168.1.13 | 21 | FTP | TCP |
| 192.168.1.13 | 80 | HTTP | TCP |
+--------------+------+------+----------+
**Full Box Scan**
Perform a full box scan on all ports, over both TCP and UDP, on a whole subnet:
.. code-block:: python
:caption: Box Port Scan Code
# Power PC 3 on before performing the full box scan
pc_3.power_on()
full_box_scan_results = pc_1_nmap.port_scan(
target_ip_address=IPv4Network("192.168.1.0/24"),
)
.. code-block:: python
:caption: Box Port Scan Return Value
{
IPv4Address('192.168.1.11'): {
<IPProtocol.UDP: 'udp'>: [
<Port.ARP: 219>
]
},
IPv4Address('192.168.1.1'): {
<IPProtocol.UDP: 'udp'>: [
<Port.ARP: 219>
]
},
IPv4Address('192.168.1.12'): {
<IPProtocol.TCP: 'tcp'>: [
<Port.HTTP: 80>,
<Port.DNS: 53>,
<Port.POSTGRES_SERVER: 5432>,
<Port.FTP: 21>
],
<IPProtocol.UDP: 'udp'>: [
<Port.NTP: 123>,
<Port.ARP: 219>
]
},
IPv4Address('192.168.1.13'): {
<IPProtocol.TCP: 'tcp'>: [
<Port.HTTP: 80>,
<Port.DNS: 53>,
<Port.FTP: 21>
],
<IPProtocol.UDP: 'udp'>: [
<Port.NTP: 123>,
<Port.ARP: 219>
]
}
}
.. code-block:: text
:caption: Box Port Scan Output
+--------------------------------------------------+
| pc_1 NMAP Port Scan (Vertical) |
+--------------+------+-----------------+----------+
| IP Address | Port | Name | Protocol |
+--------------+------+-----------------+----------+
| 192.168.1.1 | 219 | ARP | UDP |
| 192.168.1.11 | 219 | ARP | UDP |
| 192.168.1.12 | 21 | FTP | TCP |
| 192.168.1.12 | 53 | DNS | TCP |
| 192.168.1.12 | 80 | HTTP | TCP |
| 192.168.1.12 | 123 | NTP | UDP |
| 192.168.1.12 | 219 | ARP | UDP |
| 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP |
| 192.168.1.13 | 21 | FTP | TCP |
| 192.168.1.13 | 53 | DNS | TCP |
| 192.168.1.13 | 80 | HTTP | TCP |
| 192.168.1.13 | 123 | NTP | UDP |
| 192.168.1.13 | 219 | ARP | UDP |
+--------------+------+-----------------+----------+

View File

@@ -163,7 +163,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},

View File

@@ -7,6 +7,8 @@ from primaite import getLogger
from primaite.simulator.network.hardware.base import IPWiredNetworkInterface, Link, Node
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.nmap import NMAP
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.arp.arp import ARP, ARPPacket
from primaite.simulator.system.services.dns.dns_client import DNSClient
@@ -302,6 +304,7 @@ class HostNode(Node):
"DNSClient": DNSClient,
"NTPClient": NTPClient,
"WebBrowser": WebBrowser,
"NMAP": NMAP,
}
"""List of system software that is automatically installed on nodes."""
@@ -365,8 +368,15 @@ class HostNode(Node):
elif frame.udp:
dst_port = frame.udp.dst_port
can_accept_nmap = False
if self.software_manager.software.get("NMAP"):
if self.software_manager.software["NMAP"].operating_state == ApplicationOperatingState.RUNNING:
can_accept_nmap = True
accept_nmap = can_accept_nmap and frame.payload.__class__.__name__ == "PortScanPayload"
accept_frame = False
if frame.icmp or dst_port in self.software_manager.get_open_ports():
if frame.icmp or dst_port in self.software_manager.get_open_ports() or accept_nmap:
# accept the frame as the port is open or if it's an ICMP frame
accept_frame = True

View File

@@ -18,6 +18,7 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.nmap import NMAP
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.services.arp.arp import ARP
@@ -1238,6 +1239,7 @@ class Router(NetworkNode):
icmp.router = self
self.software_manager.install(RouterARP)
self.arp.router = self
self.software_manager.install(NMAP)
def _set_default_acl(self):
"""

View File

@@ -0,0 +1,313 @@
from ipaddress import IPv4Address, IPv4Network
from typing import Any, Dict, Final, List, Optional, Tuple, Union
from prettytable import PrettyTable
from pydantic import validate_call
from primaite.simulator.core import SimComponent
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
from primaite.utils.validators import IPV4Address
class PortScanPayload(SimComponent):
"""
A class representing the payload for a port scan.
:ivar ip_address: The target IP address for the port scan.
:ivar port: The target port for the port scan.
:ivar protocol: The protocol used for the port scan.
"""
ip_address: IPV4Address
port: Port
protocol: IPProtocol
request: bool = True
def describe_state(self) -> Dict:
"""
Describe the state of the port scan payload.
:return: A dictionary representation of the port scan payload state.
:rtype: Dict
"""
state = super().describe_state()
state["ip_address"] = str(self.ip_address)
state["port"] = self.port.value
state["protocol"] = self.protocol.value
state["request"] = self.request
return state
class NMAP(Application):
"""
A class representing the NMAP application for network scanning.
NMAP is a network scanning tool used to discover hosts and services on a network. It provides functionalities such
as ping scans to discover active hosts and port scans to detect open ports on those hosts.
"""
_active_port_scans: Dict[str, PortScanPayload] = {}
_port_scan_responses: Dict[str, PortScanPayload] = {}
_PORT_SCAN_TYPE_MAP: Final[Dict[Tuple[bool, bool], str]] = {
(True, True): "Box",
(True, False): "Horizontal",
(False, True): "Vertical",
(False, False): "Port",
}
def __init__(self, **kwargs):
kwargs["name"] = "NMAP"
kwargs["port"] = Port.NONE
kwargs["protocol"] = IPProtocol.NONE
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Describe the state of the NMAP application.
:return: A dictionary representation of the NMAP application's state.
:rtype: Dict
"""
return super().describe_state()
@validate_call()
def ping_scan(
self,
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
show: bool = True,
show_online_only: bool = True,
) -> List[IPV4Address]:
"""
Perform a ping scan on the target IP address(es).
:param target_ip_address: The target IP address(es) or network(s) for the ping scan.
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
:param show: Flag indicating whether to display the scan results. Defaults to True.
:type show: bool
:param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True.
:type show_online_only: bool
:return: A list of active IP addresses that responded to the ping.
:rtype: List[IPV4Address]
"""
active_nodes = []
if show:
table = PrettyTable(["IP Address", "Can Ping"])
table.align = "l"
table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan"
if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network):
target_ip_address = [target_ip_address]
ip_addresses = []
for ip_address in target_ip_address:
if isinstance(ip_address, IPv4Network):
ip_addresses += [
ip
for ip in ip_address.hosts()
if not ip == ip_address.broadcast_address and not ip == ip_address.network_address
]
else:
ip_addresses.append(ip_address)
for ip_address in set(ip_addresses):
can_ping = self.software_manager.icmp.ping(ip_address)
if can_ping:
active_nodes.append(ip_address)
if show and (can_ping or not show_online_only):
table.add_row([ip_address, can_ping])
if show:
print(table.get_string(sortby="IP Address"))
return active_nodes
def _determine_port_scan_type(self, target_ip_addresses: List[IPV4Address], target_ports: List[Port]) -> str:
"""
Determine the type of port scan based on the number of target IP addresses and ports.
:param target_ip_addresses: The list of target IP addresses.
:type target_ip_addresses: List[IPV4Address]
:param target_ports: The list of target ports.
:type target_ports: List[Port]
:return: The type of port scan.
:rtype: str
"""
vertical_scan = len(target_ports) > 1
horizontal_scan = len(target_ip_addresses) > 1
return self._PORT_SCAN_TYPE_MAP[horizontal_scan, vertical_scan]
def _check_port_open_on_ip_address(
self,
ip_address: IPv4Address,
port: Port,
protocol: IPProtocol,
is_re_attempt: bool = False,
port_scan_uuid: Optional[str] = None,
) -> bool:
"""
Check if a port is open on a specific IP address.
:param ip_address: The target IP address.
:type ip_address: IPv4Address
:param port: The target port.
:type port: Port
:param protocol: The protocol used for the port scan.
:type protocol: IPProtocol
:param is_re_attempt: Flag indicating if this is a reattempt. Defaults to False.
:type is_re_attempt: bool
:param port_scan_uuid: The UUID of the port scan payload. Defaults to None.
:type port_scan_uuid: Optional[str]
:return: True if the port is open, False otherwise.
:rtype: bool
"""
# The recursive base case
if is_re_attempt:
# Return True if a response has been received, otherwise return False
if port_scan_uuid in self._port_scan_responses:
self._port_scan_responses.pop(port_scan_uuid)
return True
return False
# Send the port scan request
payload = PortScanPayload(ip_address=ip_address, port=port, protocol=protocol)
self._active_port_scans[payload.uuid] = payload
self.sys_log.info(
f"{self.name}: Sending port scan request over {payload.protocol.name} on port {payload.port.value} "
f"({payload.port.name}) to {payload.ip_address}"
)
self.software_manager.send_payload_to_session_manager(
payload=payload, dest_ip_address=ip_address, src_port=port, dest_port=port, ip_protocol=protocol
)
# Recursively call this function with as a reattempt
return self._check_port_open_on_ip_address(
ip_address=ip_address, port=port, protocol=protocol, is_re_attempt=True, port_scan_uuid=payload.uuid
)
def _process_port_scan_response(self, payload: PortScanPayload):
"""
Process the response to a port scan request.
:param payload: The port scan payload received in response.
:type payload: PortScanPayload
"""
if payload.uuid in self._active_port_scans:
self._active_port_scans.pop(payload.uuid)
self._port_scan_responses[payload.uuid] = payload
self.sys_log.info(
f"{self.name}: Received port scan response from {payload.ip_address} on port {payload.port.value} "
f"({payload.port.name}) over {payload.protocol.name}"
)
def _process_port_scan_request(self, payload: PortScanPayload, session_id: str) -> None:
"""
Process a port scan request.
:param payload: The port scan payload received in the request.
:type payload: PortScanPayload
:param session_id: The session ID for the port scan request.
:type session_id: str
"""
if self.software_manager.check_port_is_open(port=payload.port, protocol=payload.protocol):
payload.request = False
self.sys_log.info(
f"{self.name}: Responding to port scan request for port {payload.port.value} "
f"({payload.port.name}) over {payload.protocol.name}",
True,
)
self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
@validate_call()
def port_scan(
self,
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
target_port: Optional[Union[Port, List[Port]]] = None,
show: bool = True,
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
"""
Perform a port scan on the target IP address(es).
:param target_ip_address: The target IP address(es) or network(s) for the port scan.
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
:param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP.
:type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]]
:param target_port: The port(s) to scan. Defaults to None, which includes all valid ports.
:type target_port: Optional[Union[Port, List[Port]]]
:param show: Flag indicating whether to display the scan results. Defaults to True.
:type show: bool
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
"""
if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network):
target_ip_address = [target_ip_address]
ip_addresses = []
for ip_address in target_ip_address:
if isinstance(ip_address, IPv4Network):
ip_addresses += [
ip
for ip in ip_address.hosts()
if not ip == ip_address.broadcast_address and not ip == ip_address.network_address
]
else:
ip_addresses.append(ip_address)
if isinstance(target_port, Port):
target_port = [target_port]
elif target_port is None:
target_port = [port for port in Port if port not in {Port.NONE, Port.UNUSED}]
if isinstance(target_protocol, IPProtocol):
target_protocol = [target_protocol]
elif target_protocol is None:
target_protocol = [IPProtocol.TCP, IPProtocol.UDP]
scan_type = self._determine_port_scan_type(target_ip_address, target_port)
active_ports = {}
if show:
table = PrettyTable(["IP Address", "Port", "Name", "Protocol"])
table.align = "l"
table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})"
self.sys_log.info(f"{self.name}: Starting port scan")
for ip_address in set(ip_addresses):
for protocol in target_protocol:
for port in set(target_port):
port_open = self._check_port_open_on_ip_address(ip_address=ip_address, port=port, protocol=protocol)
if port_open:
table.add_row([ip_address, port.value, port.name, protocol.name])
if ip_address not in active_ports:
active_ports[ip_address] = dict()
if protocol not in active_ports[ip_address]:
active_ports[ip_address][protocol] = []
active_ports[ip_address][protocol].append(port)
if show:
print(table.get_string(sortby="IP Address"))
return active_ports
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
Receive and process a payload.
:param payload: The payload to be processed.
:type payload: Any
:param session_id: The session ID associated with the payload.
:type session_id: str
:return: True if the payload was successfully processed, False otherwise.
:rtype: bool
"""
if isinstance(payload, PortScanPayload):
if payload.request:
self._process_port_scan_request(payload=payload, session_id=session_id)
else:
self._process_port_scan_response(payload=payload)
return True

View File

@@ -78,6 +78,31 @@ class SoftwareManager:
open_ports.append(software.port)
return open_ports
def check_port_is_open(self, port: Port, protocol: IPProtocol) -> bool:
"""
Check if a specific port is open and running a service using the specified protocol.
This method iterates through all installed software on the node and checks if any of them
are using the specified port and protocol and are currently in a running state. It returns True if any software
is found running on the specified port and protocol, otherwise False.
:param port: The port to check.
:type port: Port
:param protocol: The protocol to check (e.g., TCP, UDP).
:type protocol: IPProtocol
:return: True if the port is open and a service is running on it using the specified protocol, False otherwise.
:rtype: bool
"""
for software in self.software.values():
if (
software.port == port
and software.protocol == protocol
and software.operating_state in {ApplicationOperatingState.RUNNING, ServiceOperatingState.RUNNING}
):
return True
return False
def install(self, software_class: Type[IOSoftwareClass]):
"""
Install an Application or Service.
@@ -150,6 +175,7 @@ class SoftwareManager:
self,
payload: Any,
dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[Port] = None,
dest_port: Optional[Port] = None,
ip_protocol: IPProtocol = IPProtocol.TCP,
session_id: Optional[str] = None,
@@ -170,6 +196,7 @@ class SoftwareManager:
return self.session_manager.receive_payload_from_software_manager(
payload=payload,
dst_ip_address=dest_ip_address,
src_port=src_port,
dst_port=dest_port,
ip_protocol=ip_protocol,
session_id=session_id,
@@ -190,6 +217,9 @@ class SoftwareManager:
:param payload: The payload being received.
:param session: The transport session the payload originates from.
"""
if payload.__class__.__name__ == "PortScanPayload":
self.software.get("NMAP").receive(payload=payload, session_id=session_id)
return
receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None)
if receiver:
receiver.receive(

View File

@@ -0,0 +1,116 @@
from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.nmap import NMAP
def test_ping_scan_all_on(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
expected_result = [IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")]
actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"])
assert sorted(actual_result) == sorted(expected_result)
def test_ping_scan_all_on_full_network(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")]
actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24"))
assert sorted(actual_result) == sorted(expected_result)
def test_ping_scan_some_on(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
network.get_node_by_hostname("server_2").power_off()
expected_result = [IPv4Address("192.168.1.10")]
actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"])
assert sorted(actual_result) == sorted(expected_result)
def test_ping_scan_all_off(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
network.get_node_by_hostname("server_1").power_off()
network.get_node_by_hostname("server_2").power_off()
expected_result = []
actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"])
assert sorted(actual_result) == sorted(expected_result)
def test_port_scan_one_node_one_port(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
client_2 = network.get_node_by_hostname("client_2")
actual_result = client_1_nmap.port_scan(
target_ip_address=client_2.network_interface[1].ip_address, target_port=Port.DNS, target_protocol=IPProtocol.TCP
)
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.DNS]}}
assert actual_result == expected_result
def sort_dict(d):
"""Recursively sorts a dictionary."""
if isinstance(d, dict):
return {k: sort_dict(v) for k, v in sorted(d.items(), key=lambda item: str(item[0]))}
elif isinstance(d, list):
return sorted(d, key=lambda item: str(item) if isinstance(item, Enum) else item)
elif isinstance(d, Enum):
return str(d)
else:
return d
def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
network = example_network
client_1 = network.get_node_by_hostname("client_1")
client_1.software_manager.install(NMAP)
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
actual_result = client_1_nmap.port_scan(
target_ip_address=IPv4Network("192.168.10.0/24"),
)
expected_result = {
IPv4Address("192.168.10.21"): {IPProtocol.UDP: [Port.ARP]},
IPv4Address("192.168.10.1"): {IPProtocol.UDP: [Port.ARP]},
IPv4Address("192.168.10.22"): {
IPProtocol.TCP: [Port.HTTP, Port.FTP, Port.DNS],
IPProtocol.UDP: [Port.ARP, Port.NTP],
},
}
assert sort_dict(actual_result) == sort_dict(expected_result)