Merged PR 412: NMAP application
## Summary NMAP application added which will be utilised by red agents ## Test process *How have you tested this (if applicable)?* ## Checklist - [ ] PR is linked to a **work item** - [ ] **acceptance criteria** of linked ticket are met - [ ] performed **self-review** of the code - [ ] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [ ] updated the **change log** - [ ] ran **pre-commit** checks for code style - [ ] attended to any **TO-DOs** left in the code Related work items: #1847, #2618, #2628
This commit is contained in:
@@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added support for SQL INSERT command.
|
||||
- Added ability to log each agent's action choices in each step to a JSON file.
|
||||
- Removal of Link bandwidth hardcoding. This can now be configured via the network configuraiton yaml. Will default to 100 if not present.
|
||||
- Added NMAP application to all host and layer-3 network nodes.
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
|
||||
347
docs/source/simulation_components/system/applications/nmap.rst
Normal file
347
docs/source/simulation_components/system/applications/nmap.rst
Normal file
@@ -0,0 +1,347 @@
|
||||
.. only:: comment
|
||||
|
||||
© Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
|
||||
.. _NMAP:
|
||||
|
||||
NMAP
|
||||
====
|
||||
|
||||
Overview
|
||||
--------
|
||||
|
||||
The NMAP application is used to simulate network scanning activities. NMAP is a powerful tool that helps in discovering
|
||||
hosts and services on a network. It provides functionalities such as ping scans to discover active hosts and port scans
|
||||
to detect open ports on those hosts.
|
||||
|
||||
The NMAP application is essential for network administrators and security professionals to map out a network's
|
||||
structure, identify active devices, and find potential vulnerabilities by discovering open ports and running services.
|
||||
However, it is also a tool frequently used by attackers during the reconnaissance stage of a cyber attack to gather
|
||||
information about the target network.
|
||||
|
||||
Scan Types
|
||||
----------
|
||||
|
||||
Ping Scan
|
||||
^^^^^^^^^
|
||||
|
||||
A ping scan is used to identify which hosts on a network are active and reachable. This is achieved by sending ICMP
|
||||
Echo Request packets (ping) to the target IP addresses. If a host responds with an ICMP Echo Reply, it is considered
|
||||
active. Ping scans are useful for quickly mapping out live hosts in a network.
|
||||
|
||||
Port Scan
|
||||
^^^^^^^^^
|
||||
|
||||
A port scan is used to detect open ports on a target host or range of hosts. Open ports can indicate running services
|
||||
that might be exploitable or require securing. Port scans help in understanding the services available on a network and
|
||||
identifying potential entry points for attacks. There are three types of port scans based on the scope:
|
||||
|
||||
- **Horizontal Port Scan**: This scan targets a specific port across a range of IP addresses. It helps in identifying
|
||||
which hosts have a particular service running.
|
||||
|
||||
- **Vertical Port Scan**: This scan targets multiple ports on a single IP address. It provides detailed information
|
||||
about the services running on a specific host.
|
||||
|
||||
- **Box Scan**: This combines both horizontal and vertical scans, targeting multiple ports across multiple IP addresses.
|
||||
It gives a comprehensive view of the network's service landscape.
|
||||
|
||||
Example Usage
|
||||
-------------
|
||||
|
||||
The network we use for these examples is defined below:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ipaddress import IPv4Network
|
||||
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.system.applications.nmap import NMAP
|
||||
from primaite.simulator.system.services.database.database_service import DatabaseService
|
||||
|
||||
# Initialize the network
|
||||
network = Network()
|
||||
|
||||
# Set up the router
|
||||
router = Router(hostname="router", start_up_duration=0)
|
||||
router.power_on()
|
||||
router.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
|
||||
|
||||
# Set up PC 1
|
||||
pc_1 = Computer(
|
||||
hostname="pc_1",
|
||||
ip_address="192.168.1.11",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="192.168.1.1",
|
||||
start_up_duration=0
|
||||
)
|
||||
pc_1.power_on()
|
||||
|
||||
# Set up PC 2
|
||||
pc_2 = Computer(
|
||||
hostname="pc_2",
|
||||
ip_address="192.168.1.12",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="192.168.1.1",
|
||||
start_up_duration=0
|
||||
)
|
||||
pc_2.power_on()
|
||||
pc_2.software_manager.install(DatabaseService)
|
||||
pc_2.software_manager.software["DatabaseService"].start() # start the postgres server
|
||||
|
||||
# Set up PC 3
|
||||
pc_3 = Computer(
|
||||
hostname="pc_3",
|
||||
ip_address="192.168.1.13",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="192.168.1.1",
|
||||
start_up_duration=0
|
||||
)
|
||||
# Don't power on PC 3
|
||||
|
||||
# Set up the switch
|
||||
switch = Switch(hostname="switch", start_up_duration=0)
|
||||
switch.power_on()
|
||||
|
||||
# Connect devices
|
||||
network.connect(router.network_interface[1], switch.network_interface[24])
|
||||
network.connect(switch.network_interface[1], pc_1.network_interface[1])
|
||||
network.connect(switch.network_interface[2], pc_2.network_interface[1])
|
||||
network.connect(switch.network_interface[3], pc_3.network_interface[1])
|
||||
|
||||
|
||||
pc_1_nmap: NMAP = pc_1.software_manager.software["NMAP"]
|
||||
|
||||
|
||||
Ping Scan
|
||||
^^^^^^^^^
|
||||
|
||||
Perform a ping scan to find active hosts in the `192.168.1.0/24` subnet:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Ping Scan Code
|
||||
|
||||
active_hosts = pc_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24"))
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Ping Scan Return Value
|
||||
|
||||
[
|
||||
IPv4Address('192.168.1.11'),
|
||||
IPv4Address('192.168.1.12'),
|
||||
IPv4Address('192.168.1.1')
|
||||
]
|
||||
|
||||
.. code-block:: text
|
||||
:caption: Ping Scan Output
|
||||
|
||||
+-------------------------+
|
||||
| pc_1 NMAP Ping Scan |
|
||||
+--------------+----------+
|
||||
| IP Address | Can Ping |
|
||||
+--------------+----------+
|
||||
| 192.168.1.1 | True |
|
||||
| 192.168.1.11 | True |
|
||||
| 192.168.1.12 | True |
|
||||
+--------------+----------+
|
||||
|
||||
Horizontal Port Scan
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Perform a horizontal port scan on port 5432 across multiple IP addresses:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Horizontal Port Scan Code
|
||||
|
||||
horizontal_scan_results = pc_1_nmap.port_scan(
|
||||
target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")],
|
||||
target_port=Port(5432 )
|
||||
)
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Horizontal Port Scan Return Value
|
||||
|
||||
{
|
||||
IPv4Address('192.168.1.12'): {
|
||||
<IPProtocol.TCP: 'tcp'>: [
|
||||
<Port.POSTGRES_SERVER: 5432>
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
.. code-block:: text
|
||||
:caption: Horizontal Port Scan Output
|
||||
|
||||
+--------------------------------------------------+
|
||||
| pc_1 NMAP Port Scan (Horizontal) |
|
||||
+--------------+------+-----------------+----------+
|
||||
| IP Address | Port | Name | Protocol |
|
||||
+--------------+------+-----------------+----------+
|
||||
| 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP |
|
||||
+--------------+------+-----------------+----------+
|
||||
|
||||
Vertical Post Scan
|
||||
^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Perform a vertical port scan on multiple ports on a single IP address:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Vertical Port Scan Code
|
||||
|
||||
vertical_scan_results = pc_1_nmap.port_scan(
|
||||
target_ip_address=[IPv4Address("192.168.1.12")],
|
||||
target_port=[Port(21), Port(22), Port(80), Port(443)]
|
||||
)
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Vertical Port Scan Return Value
|
||||
|
||||
{
|
||||
IPv4Address('192.168.1.12'): {
|
||||
<IPProtocol.TCP: 'tcp'>: [
|
||||
<Port.FTP: 21>,
|
||||
<Port.HTTP: 80>
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
.. code-block:: text
|
||||
:caption: Vertical Port Scan Output
|
||||
|
||||
+---------------------------------------+
|
||||
| pc_1 NMAP Port Scan (Vertical) |
|
||||
+--------------+------+------+----------+
|
||||
| IP Address | Port | Name | Protocol |
|
||||
+--------------+------+------+----------+
|
||||
| 192.168.1.12 | 21 | FTP | TCP |
|
||||
| 192.168.1.12 | 80 | HTTP | TCP |
|
||||
+--------------+------+------+----------+
|
||||
|
||||
Box Scan
|
||||
^^^^^^^^
|
||||
|
||||
Perform a box scan on multiple ports across multiple IP addresses:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Box Port Scan Code
|
||||
|
||||
# Power PC 3 on before performing the box scan
|
||||
pc_3.power_on()
|
||||
|
||||
|
||||
box_scan_results = pc_1_nmap.port_scan(
|
||||
target_ip_address=[IPv4Address("192.168.1.12"), IPv4Address("192.168.1.13")],
|
||||
target_port=[Port(21), Port(22), Port(80), Port(443)]
|
||||
)
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Box Port Scan Return Value
|
||||
|
||||
{
|
||||
IPv4Address('192.168.1.13'): {
|
||||
<IPProtocol.TCP: 'tcp'>: [
|
||||
<Port.FTP: 21>,
|
||||
<Port.HTTP: 80>
|
||||
]
|
||||
},
|
||||
IPv4Address('192.168.1.12'): {
|
||||
<IPProtocol.TCP: 'tcp'>: [
|
||||
<Port.FTP: 21>,
|
||||
<Port.HTTP: 80>
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
.. code-block:: text
|
||||
:caption: Box Port Scan Output
|
||||
|
||||
+---------------------------------------+
|
||||
| pc_1 NMAP Port Scan (Box) |
|
||||
+--------------+------+------+----------+
|
||||
| IP Address | Port | Name | Protocol |
|
||||
+--------------+------+------+----------+
|
||||
| 192.168.1.12 | 21 | FTP | TCP |
|
||||
| 192.168.1.12 | 80 | HTTP | TCP |
|
||||
| 192.168.1.13 | 21 | FTP | TCP |
|
||||
| 192.168.1.13 | 80 | HTTP | TCP |
|
||||
+--------------+------+------+----------+
|
||||
|
||||
Full Box Scan
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
Perform a full box scan on all ports, over both TCP and UDP, on a whole subnet:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Box Port Scan Code
|
||||
|
||||
# Power PC 3 on before performing the full box scan
|
||||
pc_3.power_on()
|
||||
|
||||
|
||||
full_box_scan_results = pc_1_nmap.port_scan(
|
||||
target_ip_address=IPv4Network("192.168.1.0/24"),
|
||||
)
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Box Port Scan Return Value
|
||||
|
||||
{
|
||||
IPv4Address('192.168.1.11'): {
|
||||
<IPProtocol.UDP: 'udp'>: [
|
||||
<Port.ARP: 219>
|
||||
]
|
||||
},
|
||||
IPv4Address('192.168.1.1'): {
|
||||
<IPProtocol.UDP: 'udp'>: [
|
||||
<Port.ARP: 219>
|
||||
]
|
||||
},
|
||||
IPv4Address('192.168.1.12'): {
|
||||
<IPProtocol.TCP: 'tcp'>: [
|
||||
<Port.HTTP: 80>,
|
||||
<Port.DNS: 53>,
|
||||
<Port.POSTGRES_SERVER: 5432>,
|
||||
<Port.FTP: 21>
|
||||
],
|
||||
<IPProtocol.UDP: 'udp'>: [
|
||||
<Port.NTP: 123>,
|
||||
<Port.ARP: 219>
|
||||
]
|
||||
},
|
||||
IPv4Address('192.168.1.13'): {
|
||||
<IPProtocol.TCP: 'tcp'>: [
|
||||
<Port.HTTP: 80>,
|
||||
<Port.DNS: 53>,
|
||||
<Port.FTP: 21>
|
||||
],
|
||||
<IPProtocol.UDP: 'udp'>: [
|
||||
<Port.NTP: 123>,
|
||||
<Port.ARP: 219>
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
.. code-block:: text
|
||||
:caption: Box Port Scan Output
|
||||
|
||||
+--------------------------------------------------+
|
||||
| pc_1 NMAP Port Scan (Box) |
|
||||
+--------------+------+-----------------+----------+
|
||||
| IP Address | Port | Name | Protocol |
|
||||
+--------------+------+-----------------+----------+
|
||||
| 192.168.1.1 | 219 | ARP | UDP |
|
||||
| 192.168.1.11 | 219 | ARP | UDP |
|
||||
| 192.168.1.12 | 21 | FTP | TCP |
|
||||
| 192.168.1.12 | 53 | DNS | TCP |
|
||||
| 192.168.1.12 | 80 | HTTP | TCP |
|
||||
| 192.168.1.12 | 123 | NTP | UDP |
|
||||
| 192.168.1.12 | 219 | ARP | UDP |
|
||||
| 192.168.1.12 | 5432 | POSTGRES_SERVER | TCP |
|
||||
| 192.168.1.13 | 21 | FTP | TCP |
|
||||
| 192.168.1.13 | 53 | DNS | TCP |
|
||||
| 192.168.1.13 | 80 | HTTP | TCP |
|
||||
| 192.168.1.13 | 123 | NTP | UDP |
|
||||
| 192.168.1.13 | 219 | ARP | UDP |
|
||||
+--------------+------+-----------------+----------+
|
||||
@@ -10,7 +10,7 @@ AbstractAction. The ActionManager is responsible for:
|
||||
"""
|
||||
import itertools
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING
|
||||
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, Union
|
||||
|
||||
from gymnasium import spaces
|
||||
|
||||
@@ -870,6 +870,74 @@ class NetworkPortDisableAction(AbstractAction):
|
||||
return ["network", "node", target_nodename, "network_interface", port_id, "disable"]
|
||||
|
||||
|
||||
class NodeNMAPPingScanAction(AbstractAction):
|
||||
"""Action which performs an NMAP ping scan."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", **kwargs) -> None:
|
||||
super().__init__(manager=manager)
|
||||
|
||||
def form_request(self, source_node: str, target_ip_address: Union[str, List[str]]) -> List[str]: # noqa
|
||||
return [
|
||||
"network",
|
||||
"node",
|
||||
source_node,
|
||||
"application",
|
||||
"NMAP",
|
||||
"ping_scan",
|
||||
{"target_ip_address": target_ip_address},
|
||||
]
|
||||
|
||||
|
||||
class NodeNMAPPortScanAction(AbstractAction):
|
||||
"""Action which performs an NMAP port scan."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", **kwargs) -> None:
|
||||
super().__init__(manager=manager)
|
||||
|
||||
def form_request(
|
||||
self,
|
||||
source_node: str,
|
||||
target_ip_address: Union[str, List[str]],
|
||||
target_protocol: Optional[Union[str, List[str]]] = None,
|
||||
target_port: Optional[Union[str, List[str]]] = None,
|
||||
) -> List[str]: # noqa
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
return [
|
||||
"network",
|
||||
"node",
|
||||
source_node,
|
||||
"application",
|
||||
"NMAP",
|
||||
"port_scan",
|
||||
{"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol},
|
||||
]
|
||||
|
||||
|
||||
class NodeNetworkServiceReconAction(AbstractAction):
|
||||
"""Action which performs an NMAP network service recon (ping scan followed by port scan)."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", **kwargs) -> None:
|
||||
super().__init__(manager=manager)
|
||||
|
||||
def form_request(
|
||||
self,
|
||||
source_node: str,
|
||||
target_ip_address: Union[str, List[str]],
|
||||
target_protocol: Optional[Union[str, List[str]]] = None,
|
||||
target_port: Optional[Union[str, List[str]]] = None,
|
||||
) -> List[str]: # noqa
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
return [
|
||||
"network",
|
||||
"node",
|
||||
source_node,
|
||||
"application",
|
||||
"NMAP",
|
||||
"network_service_recon",
|
||||
{"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol},
|
||||
]
|
||||
|
||||
|
||||
class ActionManager:
|
||||
"""Class which manages the action space for an agent."""
|
||||
|
||||
@@ -915,6 +983,9 @@ class ActionManager:
|
||||
"HOST_NIC_DISABLE": HostNICDisableAction,
|
||||
"NETWORK_PORT_ENABLE": NetworkPortEnableAction,
|
||||
"NETWORK_PORT_DISABLE": NetworkPortDisableAction,
|
||||
"NODE_NMAP_PING_SCAN": NodeNMAPPingScanAction,
|
||||
"NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction,
|
||||
"NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction,
|
||||
}
|
||||
"""Dictionary which maps action type strings to the corresponding action class."""
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ from typing import Dict, ForwardRef, List, Literal, Union
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, StrictBool, validate_call
|
||||
|
||||
RequestFormat = List[Union[str, int, float]]
|
||||
RequestFormat = List[Union[str, int, float, Dict]]
|
||||
|
||||
RequestResponse = ForwardRef("RequestResponse")
|
||||
"""This makes it possible to type-hint RequestResponse.from_bool return type."""
|
||||
|
||||
@@ -168,7 +168,7 @@
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "venv",
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
|
||||
@@ -221,7 +221,7 @@ class SimComponent(BaseModel):
|
||||
return state
|
||||
|
||||
@validate_call
|
||||
def apply_request(self, request: RequestFormat, context: Dict = {}) -> RequestResponse:
|
||||
def apply_request(self, request: RequestFormat, context: Optional[Dict] = None) -> RequestResponse:
|
||||
"""
|
||||
Apply a request to a simulation component. Request data is passed in as a 'namespaced' list of strings.
|
||||
|
||||
@@ -239,6 +239,8 @@ class SimComponent(BaseModel):
|
||||
:param: context: Dict containing context for requests
|
||||
:type context: Dict
|
||||
"""
|
||||
if not context:
|
||||
context = None
|
||||
if self._request_manager is None:
|
||||
return
|
||||
return self._request_manager(request, context)
|
||||
|
||||
@@ -28,6 +28,7 @@ from primaite.simulator.network.nmne import (
|
||||
NMNE_CAPTURE_KEYWORDS,
|
||||
)
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.core.packet_capture import PacketCapture
|
||||
from primaite.simulator.system.core.session_manager import SessionManager
|
||||
@@ -36,6 +37,7 @@ from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.processes.process import Process
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from primaite.simulator.system.software import IOSoftware
|
||||
from primaite.utils.converters import convert_dict_enum_keys_to_enum_values
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
IOSoftwareClass = TypeVar("IOSoftwareClass", bound=IOSoftware)
|
||||
@@ -107,10 +109,14 @@ class NetworkInterface(SimComponent, ABC):
|
||||
nmne: Dict = Field(default_factory=lambda: {})
|
||||
"A dict containing details of the number of malicious network events captured."
|
||||
|
||||
traffic: Dict = Field(default_factory=lambda: {})
|
||||
"A dict containing details of the inbound and outbound traffic by port and protocol."
|
||||
|
||||
def setup_for_episode(self, episode: int):
|
||||
"""Reset the original state of the SimComponent."""
|
||||
super().setup_for_episode(episode=episode)
|
||||
self.nmne = {}
|
||||
self.traffic = {}
|
||||
if episode and self.pcap and SIM_OUTPUT.save_pcap_logs:
|
||||
self.pcap.current_episode = episode
|
||||
self.pcap.setup_logger()
|
||||
@@ -146,6 +152,7 @@ class NetworkInterface(SimComponent, ABC):
|
||||
)
|
||||
if CAPTURE_NMNE:
|
||||
state.update({"nmne": {k: v for k, v in self.nmne.items()}})
|
||||
state.update({"traffic": convert_dict_enum_keys_to_enum_values(self.traffic)})
|
||||
return state
|
||||
|
||||
@abstractmethod
|
||||
@@ -236,6 +243,47 @@ class NetworkInterface(SimComponent, ABC):
|
||||
# Increment a generic counter if keyword capturing is not enabled
|
||||
keyword_level["*"] = keyword_level.get("*", 0) + 1
|
||||
|
||||
def _capture_traffic(self, frame: Frame, inbound: bool = True):
|
||||
"""
|
||||
Capture traffic statistics at the Network Interface.
|
||||
|
||||
:param frame: The network frame containing the traffic data.
|
||||
:type frame: Frame
|
||||
:param inbound: Flag indicating if the traffic is inbound or outbound. Defaults to True.
|
||||
:type inbound: bool
|
||||
"""
|
||||
# Determine the direction of the traffic
|
||||
direction = "inbound" if inbound else "outbound"
|
||||
|
||||
# Initialize protocol and port variables
|
||||
protocol = None
|
||||
port = None
|
||||
|
||||
# Identify the protocol and port from the frame
|
||||
if frame.tcp:
|
||||
protocol = IPProtocol.TCP
|
||||
port = frame.tcp.dst_port
|
||||
elif frame.udp:
|
||||
protocol = IPProtocol.UDP
|
||||
port = frame.udp.dst_port
|
||||
elif frame.icmp:
|
||||
protocol = IPProtocol.ICMP
|
||||
|
||||
# Ensure the protocol is in the capture dict
|
||||
if protocol not in self.traffic:
|
||||
self.traffic[protocol] = {}
|
||||
|
||||
# Handle non-ICMP protocols that use ports
|
||||
if protocol != IPProtocol.ICMP:
|
||||
if port not in self.traffic[protocol]:
|
||||
self.traffic[protocol][port] = {"inbound": 0, "outbound": 0}
|
||||
self.traffic[protocol][port][direction] += frame.size
|
||||
else:
|
||||
# Handle ICMP protocol separately (ICMP does not use ports)
|
||||
if not self.traffic[protocol]:
|
||||
self.traffic[protocol] = {"inbound": 0, "outbound": 0}
|
||||
self.traffic[protocol][direction] += frame.size
|
||||
|
||||
@abstractmethod
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
@@ -245,6 +293,7 @@ class NetworkInterface(SimComponent, ABC):
|
||||
:return: A boolean indicating whether the frame was successfully sent.
|
||||
"""
|
||||
self._capture_nmne(frame, inbound=False)
|
||||
self._capture_traffic(frame, inbound=False)
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
@@ -255,6 +304,7 @@ class NetworkInterface(SimComponent, ABC):
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
self._capture_nmne(frame, inbound=True)
|
||||
self._capture_traffic(frame, inbound=True)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
@@ -766,6 +816,24 @@ class Node(SimComponent):
|
||||
self.session_manager.software_manager = self.software_manager
|
||||
self._install_system_software()
|
||||
|
||||
def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool:
|
||||
"""
|
||||
Checks if a given IP address belongs to any of the nodes interfaces.
|
||||
|
||||
:param ip_address: The IP address to check.
|
||||
:param enabled_only: If True, only considers enabled network interfaces.
|
||||
:return: True if the IP address is assigned to one of the nodes interfaces; False otherwise.
|
||||
"""
|
||||
for network_interface in self.network_interface.values():
|
||||
if not hasattr(network_interface, "ip_address"):
|
||||
continue
|
||||
if network_interface.ip_address == ip_address:
|
||||
if enabled_only:
|
||||
return network_interface.enabled
|
||||
else:
|
||||
return True
|
||||
return False
|
||||
|
||||
def setup_for_episode(self, episode: int):
|
||||
"""Reset the original state of the SimComponent."""
|
||||
super().setup_for_episode(episode=episode)
|
||||
|
||||
@@ -7,6 +7,8 @@ from primaite import getLogger
|
||||
from primaite.simulator.network.hardware.base import IPWiredNetworkInterface, Link, Node
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.applications.nmap import NMAP
|
||||
from primaite.simulator.system.applications.web_browser import WebBrowser
|
||||
from primaite.simulator.system.services.arp.arp import ARP, ARPPacket
|
||||
from primaite.simulator.system.services.dns.dns_client import DNSClient
|
||||
@@ -302,6 +304,7 @@ class HostNode(Node):
|
||||
"DNSClient": DNSClient,
|
||||
"NTPClient": NTPClient,
|
||||
"WebBrowser": WebBrowser,
|
||||
"NMAP": NMAP,
|
||||
}
|
||||
"""List of system software that is automatically installed on nodes."""
|
||||
|
||||
@@ -314,6 +317,16 @@ class HostNode(Node):
|
||||
super().__init__(**kwargs)
|
||||
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask))
|
||||
|
||||
@property
|
||||
def nmap(self) -> Optional[NMAP]:
|
||||
"""
|
||||
Return the NMAP application installed on the Node.
|
||||
|
||||
:return: NMAP application installed on the Node.
|
||||
:rtype: Optional[NMAP]
|
||||
"""
|
||||
return self.software_manager.software.get("NMAP")
|
||||
|
||||
@property
|
||||
def arp(self) -> Optional[ARP]:
|
||||
"""
|
||||
@@ -365,8 +378,15 @@ class HostNode(Node):
|
||||
elif frame.udp:
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
can_accept_nmap = False
|
||||
if self.software_manager.software.get("NMAP"):
|
||||
if self.software_manager.software["NMAP"].operating_state == ApplicationOperatingState.RUNNING:
|
||||
can_accept_nmap = True
|
||||
|
||||
accept_nmap = can_accept_nmap and frame.payload.__class__.__name__ == "PortScanPayload"
|
||||
|
||||
accept_frame = False
|
||||
if frame.icmp or dst_port in self.software_manager.get_open_ports():
|
||||
if frame.icmp or dst_port in self.software_manager.get_open_ports() or accept_nmap:
|
||||
# accept the frame as the port is open or if it's an ICMP frame
|
||||
accept_frame = True
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.nmap import NMAP
|
||||
from primaite.simulator.system.core.session_manager import SessionManager
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.services.arp.arp import ARP
|
||||
@@ -1238,6 +1239,7 @@ class Router(NetworkNode):
|
||||
icmp.router = self
|
||||
self.software_manager.install(RouterARP)
|
||||
self.arp.router = self
|
||||
self.software_manager.install(NMAP)
|
||||
|
||||
def _set_default_acl(self):
|
||||
"""
|
||||
|
||||
451
src/primaite/simulator/system/applications/nmap.py
Normal file
451
src/primaite/simulator/system/applications/nmap.py
Normal file
@@ -0,0 +1,451 @@
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
from typing import Any, Dict, Final, List, Optional, Set, Tuple, Union
|
||||
|
||||
from prettytable import PrettyTable
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.interface.request import RequestResponse
|
||||
from primaite.simulator.core import RequestManager, RequestType, SimComponent
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
class PortScanPayload(SimComponent):
|
||||
"""
|
||||
A class representing the payload for a port scan.
|
||||
|
||||
:ivar ip_address: The target IP address for the port scan.
|
||||
:ivar port: The target port for the port scan.
|
||||
:ivar protocol: The protocol used for the port scan.
|
||||
:ivar request:Flag to indicate whether this is a request or not.
|
||||
"""
|
||||
|
||||
ip_address: IPV4Address
|
||||
port: Port
|
||||
protocol: IPProtocol
|
||||
request: bool = True
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Describe the state of the port scan payload.
|
||||
|
||||
:return: A dictionary representation of the port scan payload state.
|
||||
:rtype: Dict
|
||||
"""
|
||||
state = super().describe_state()
|
||||
state["ip_address"] = str(self.ip_address)
|
||||
state["port"] = self.port.value
|
||||
state["protocol"] = self.protocol.value
|
||||
state["request"] = self.request
|
||||
|
||||
return state
|
||||
|
||||
|
||||
class NMAP(Application):
|
||||
"""
|
||||
A class representing the NMAP application for network scanning.
|
||||
|
||||
NMAP is a network scanning tool used to discover hosts and services on a network. It provides functionalities such
|
||||
as ping scans to discover active hosts and port scans to detect open ports on those hosts.
|
||||
"""
|
||||
|
||||
_active_port_scans: Dict[str, PortScanPayload] = {}
|
||||
_port_scan_responses: Dict[str, PortScanPayload] = {}
|
||||
|
||||
_PORT_SCAN_TYPE_MAP: Final[Dict[Tuple[bool, bool], str]] = {
|
||||
(True, True): "Box",
|
||||
(True, False): "Horizontal",
|
||||
(False, True): "Vertical",
|
||||
(False, False): "Port",
|
||||
}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "NMAP"
|
||||
kwargs["port"] = Port.NONE
|
||||
kwargs["protocol"] = IPProtocol.NONE
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def _can_perform_network_action(self) -> bool:
|
||||
"""
|
||||
Checks if the NMAP application can perform outbound network actions.
|
||||
|
||||
This is done by checking the parent application can_per_action functionality. Then checking if there is an
|
||||
enabled NIC that can be used for outbound traffic.
|
||||
|
||||
:return: True if outbound network actions can be performed, otherwise False.
|
||||
"""
|
||||
if not super()._can_perform_action():
|
||||
return False
|
||||
|
||||
for nic in self.software_manager.node.network_interface.values():
|
||||
if nic.enabled:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
def _ping_scan_action(request: List[Any], context: Any) -> RequestResponse:
|
||||
results = self.ping_scan(target_ip_address=request[0]["target_ip_address"], json_serializable=True)
|
||||
if not self._can_perform_network_action():
|
||||
return RequestResponse.from_bool(False)
|
||||
return RequestResponse(
|
||||
status="success",
|
||||
data={"live_hosts": results},
|
||||
)
|
||||
|
||||
def _port_scan_action(request: List[Any], context: Any) -> RequestResponse:
|
||||
results = self.port_scan(**request[0], json_serializable=True)
|
||||
if not self._can_perform_network_action():
|
||||
return RequestResponse.from_bool(False)
|
||||
return RequestResponse(
|
||||
status="success",
|
||||
data=results,
|
||||
)
|
||||
|
||||
def _network_service_recon_action(request: List[Any], context: Any) -> RequestResponse:
|
||||
results = self.network_service_recon(**request[0], json_serializable=True)
|
||||
if not self._can_perform_network_action():
|
||||
return RequestResponse.from_bool(False)
|
||||
return RequestResponse(
|
||||
status="success",
|
||||
data=results,
|
||||
)
|
||||
|
||||
rm = RequestManager()
|
||||
|
||||
rm.add_request(
|
||||
name="ping_scan",
|
||||
request_type=RequestType(func=_ping_scan_action),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
name="port_scan",
|
||||
request_type=RequestType(func=_port_scan_action),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
name="network_service_recon",
|
||||
request_type=RequestType(func=_network_service_recon_action),
|
||||
)
|
||||
|
||||
return rm
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Describe the state of the NMAP application.
|
||||
|
||||
:return: A dictionary representation of the NMAP application's state.
|
||||
:rtype: Dict
|
||||
"""
|
||||
return super().describe_state()
|
||||
|
||||
@staticmethod
|
||||
def _explode_ip_address_network_array(
|
||||
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
|
||||
) -> Set[IPv4Address]:
|
||||
"""
|
||||
Explode a mixed array of IP addresses and networks into a set of individual IP addresses.
|
||||
|
||||
This method takes a combination of single and lists of IPv4 addresses and IPv4 networks, expands any networks
|
||||
into their constituent subnet useable IP addresses, and returns a set of unique IP addresses. Broadcast and
|
||||
network addresses are excluded from the result.
|
||||
|
||||
:param target_ip_address: A single or list of IPv4 addresses and networks.
|
||||
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
|
||||
:return: A set of unique IPv4 addresses expanded from the input.
|
||||
:rtype: Set[IPv4Address]
|
||||
"""
|
||||
if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network):
|
||||
target_ip_address = [target_ip_address]
|
||||
ip_addresses: List[IPV4Address] = []
|
||||
for ip_address in target_ip_address:
|
||||
if isinstance(ip_address, IPv4Network):
|
||||
ip_addresses += [
|
||||
ip
|
||||
for ip in ip_address.hosts()
|
||||
if not ip == ip_address.broadcast_address and not ip == ip_address.network_address
|
||||
]
|
||||
else:
|
||||
ip_addresses.append(ip_address)
|
||||
return set(ip_addresses)
|
||||
|
||||
@validate_call()
|
||||
def ping_scan(
|
||||
self,
|
||||
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
|
||||
show: bool = True,
|
||||
show_online_only: bool = True,
|
||||
json_serializable: bool = False,
|
||||
) -> Union[List[IPV4Address], List[str]]:
|
||||
"""
|
||||
Perform a ping scan on the target IP address(es).
|
||||
|
||||
:param target_ip_address: The target IP address(es) or network(s) for the ping scan.
|
||||
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
|
||||
:param show: Flag indicating whether to display the scan results. Defaults to True.
|
||||
:type show: bool
|
||||
:param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True.
|
||||
:type show_online_only: bool
|
||||
:param json_serializable: Flag indicating whether the return value should be json serializable. Defaults to
|
||||
False.
|
||||
:type json_serializable: bool
|
||||
|
||||
:return: A list of active IP addresses that responded to the ping.
|
||||
:rtype: Union[List[IPV4Address], List[str]]
|
||||
"""
|
||||
active_nodes = []
|
||||
if show:
|
||||
table = PrettyTable(["IP Address", "Can Ping"])
|
||||
table.align = "l"
|
||||
table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan"
|
||||
|
||||
ip_addresses = self._explode_ip_address_network_array(target_ip_address)
|
||||
|
||||
for ip_address in ip_addresses:
|
||||
# Prevent ping scan on this node
|
||||
if self.software_manager.node.ip_is_network_interface(ip_address=ip_address):
|
||||
continue
|
||||
can_ping = self.software_manager.icmp.ping(ip_address)
|
||||
if can_ping:
|
||||
active_nodes.append(ip_address if not json_serializable else str(ip_address))
|
||||
if show and (can_ping or not show_online_only):
|
||||
table.add_row([ip_address, can_ping])
|
||||
if show:
|
||||
print(table.get_string(sortby="IP Address"))
|
||||
return active_nodes
|
||||
|
||||
def _determine_port_scan_type(self, target_ip_addresses: List[IPV4Address], target_ports: List[Port]) -> str:
|
||||
"""
|
||||
Determine the type of port scan based on the number of target IP addresses and ports.
|
||||
|
||||
:param target_ip_addresses: The list of target IP addresses.
|
||||
:type target_ip_addresses: List[IPV4Address]
|
||||
:param target_ports: The list of target ports.
|
||||
:type target_ports: List[Port]
|
||||
|
||||
:return: The type of port scan.
|
||||
:rtype: str
|
||||
"""
|
||||
vertical_scan = len(target_ports) > 1
|
||||
horizontal_scan = len(target_ip_addresses) > 1
|
||||
|
||||
return self._PORT_SCAN_TYPE_MAP[horizontal_scan, vertical_scan]
|
||||
|
||||
def _check_port_open_on_ip_address(
|
||||
self,
|
||||
ip_address: IPv4Address,
|
||||
port: Port,
|
||||
protocol: IPProtocol,
|
||||
is_re_attempt: bool = False,
|
||||
port_scan_uuid: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a port is open on a specific IP address.
|
||||
|
||||
:param ip_address: The target IP address.
|
||||
:type ip_address: IPv4Address
|
||||
:param port: The target port.
|
||||
:type port: Port
|
||||
:param protocol: The protocol used for the port scan.
|
||||
:type protocol: IPProtocol
|
||||
:param is_re_attempt: Flag indicating if this is a reattempt. Defaults to False.
|
||||
:type is_re_attempt: bool
|
||||
:param port_scan_uuid: The UUID of the port scan payload. Defaults to None.
|
||||
:type port_scan_uuid: Optional[str]
|
||||
|
||||
:return: True if the port is open, False otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
# The recursive base case
|
||||
if is_re_attempt:
|
||||
# Return True if a response has been received, otherwise return False
|
||||
if port_scan_uuid in self._port_scan_responses:
|
||||
self._port_scan_responses.pop(port_scan_uuid)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Send the port scan request
|
||||
payload = PortScanPayload(ip_address=ip_address, port=port, protocol=protocol)
|
||||
self._active_port_scans[payload.uuid] = payload
|
||||
self.sys_log.info(
|
||||
f"{self.name}: Sending port scan request over {payload.protocol.name} on port {payload.port.value} "
|
||||
f"({payload.port.name}) to {payload.ip_address}"
|
||||
)
|
||||
self.software_manager.send_payload_to_session_manager(
|
||||
payload=payload, dest_ip_address=ip_address, src_port=port, dest_port=port, ip_protocol=protocol
|
||||
)
|
||||
|
||||
# Recursively call this function with as a reattempt
|
||||
return self._check_port_open_on_ip_address(
|
||||
ip_address=ip_address, port=port, protocol=protocol, is_re_attempt=True, port_scan_uuid=payload.uuid
|
||||
)
|
||||
|
||||
def _process_port_scan_response(self, payload: PortScanPayload):
|
||||
"""
|
||||
Process the response to a port scan request.
|
||||
|
||||
:param payload: The port scan payload received in response.
|
||||
:type payload: PortScanPayload
|
||||
"""
|
||||
if payload.uuid in self._active_port_scans:
|
||||
self._active_port_scans.pop(payload.uuid)
|
||||
self._port_scan_responses[payload.uuid] = payload
|
||||
self.sys_log.info(
|
||||
f"{self.name}: Received port scan response from {payload.ip_address} on port {payload.port.value} "
|
||||
f"({payload.port.name}) over {payload.protocol.name}"
|
||||
)
|
||||
|
||||
def _process_port_scan_request(self, payload: PortScanPayload, session_id: str) -> None:
|
||||
"""
|
||||
Process a port scan request.
|
||||
|
||||
:param payload: The port scan payload received in the request.
|
||||
:type payload: PortScanPayload
|
||||
:param session_id: The session ID for the port scan request.
|
||||
:type session_id: str
|
||||
"""
|
||||
if self.software_manager.check_port_is_open(port=payload.port, protocol=payload.protocol):
|
||||
payload.request = False
|
||||
self.sys_log.info(
|
||||
f"{self.name}: Responding to port scan request for port {payload.port.value} "
|
||||
f"({payload.port.name}) over {payload.protocol.name}",
|
||||
True,
|
||||
)
|
||||
self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
|
||||
|
||||
@validate_call()
|
||||
def port_scan(
|
||||
self,
|
||||
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
|
||||
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
|
||||
target_port: Optional[Union[Port, List[Port]]] = None,
|
||||
show: bool = True,
|
||||
json_serializable: bool = False,
|
||||
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
|
||||
"""
|
||||
Perform a port scan on the target IP address(es).
|
||||
|
||||
:param target_ip_address: The target IP address(es) or network(s) for the port scan.
|
||||
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
|
||||
:param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP.
|
||||
:type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]]
|
||||
:param target_port: The port(s) to scan. Defaults to None, which includes all valid ports.
|
||||
:type target_port: Optional[Union[Port, List[Port]]]
|
||||
:param show: Flag indicating whether to display the scan results. Defaults to True.
|
||||
:type show: bool
|
||||
:param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to
|
||||
False.
|
||||
:type json_serializable: bool
|
||||
|
||||
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
|
||||
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
|
||||
"""
|
||||
ip_addresses = self._explode_ip_address_network_array(target_ip_address)
|
||||
|
||||
if isinstance(target_port, Port):
|
||||
target_port = [target_port]
|
||||
elif target_port is None:
|
||||
target_port = [port for port in Port if port not in {Port.NONE, Port.UNUSED}]
|
||||
|
||||
if isinstance(target_protocol, IPProtocol):
|
||||
target_protocol = [target_protocol]
|
||||
elif target_protocol is None:
|
||||
target_protocol = [IPProtocol.TCP, IPProtocol.UDP]
|
||||
|
||||
scan_type = self._determine_port_scan_type(list(ip_addresses), target_port)
|
||||
active_ports = {}
|
||||
if show:
|
||||
table = PrettyTable(["IP Address", "Port", "Name", "Protocol"])
|
||||
table.align = "l"
|
||||
table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})"
|
||||
self.sys_log.info(f"{self.name}: Starting port scan")
|
||||
for ip_address in ip_addresses:
|
||||
# Prevent port scan on this node
|
||||
if self.software_manager.node.ip_is_network_interface(ip_address=ip_address):
|
||||
continue
|
||||
for protocol in target_protocol:
|
||||
for port in set(target_port):
|
||||
port_open = self._check_port_open_on_ip_address(ip_address=ip_address, port=port, protocol=protocol)
|
||||
|
||||
if port_open:
|
||||
table.add_row([ip_address, port.value, port.name, protocol.name])
|
||||
_ip_address = ip_address if not json_serializable else str(ip_address)
|
||||
_protocol = protocol if not json_serializable else protocol.value
|
||||
_port = port if not json_serializable else port.value
|
||||
if _ip_address not in active_ports:
|
||||
active_ports[_ip_address] = dict()
|
||||
if _protocol not in active_ports[_ip_address]:
|
||||
active_ports[_ip_address][_protocol] = []
|
||||
active_ports[_ip_address][_protocol].append(_port)
|
||||
|
||||
if show:
|
||||
print(table.get_string(sortby="IP Address"))
|
||||
|
||||
return active_ports
|
||||
|
||||
def network_service_recon(
|
||||
self,
|
||||
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
|
||||
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
|
||||
target_port: Optional[Union[Port, List[Port]]] = None,
|
||||
show: bool = True,
|
||||
show_online_only: bool = True,
|
||||
json_serializable: bool = False,
|
||||
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
|
||||
"""
|
||||
Perform a network service reconnaissance which includes a ping scan followed by a port scan.
|
||||
|
||||
This method combines the functionalities of a ping scan and a port scan to provide a comprehensive
|
||||
overview of the services on the network. It first identifies active hosts in the target IP range by performing
|
||||
a ping scan. Once the active hosts are identified, it performs a port scan on these hosts to identify open
|
||||
ports and running services. This two-step process ensures that the port scan is performed only on live hosts,
|
||||
optimising the scanning process and providing accurate results.
|
||||
|
||||
:param target_ip_address: The target IP address(es) or network(s) for the port scan.
|
||||
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
|
||||
:param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP.
|
||||
:type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]]
|
||||
:param target_port: The port(s) to scan. Defaults to None, which includes all valid ports.
|
||||
:type target_port: Optional[Union[Port, List[Port]]]
|
||||
:param show: Flag indicating whether to display the scan results. Defaults to True.
|
||||
:type show: bool
|
||||
:param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True.
|
||||
:type show_online_only: bool
|
||||
:param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to
|
||||
False.
|
||||
:type json_serializable: bool
|
||||
|
||||
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
|
||||
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
|
||||
"""
|
||||
ping_scan_results = self.ping_scan(
|
||||
target_ip_address=target_ip_address, show=show, show_online_only=show_online_only, json_serializable=False
|
||||
)
|
||||
return self.port_scan(
|
||||
target_ip_address=ping_scan_results,
|
||||
target_protocol=target_protocol,
|
||||
target_port=target_port,
|
||||
show=show,
|
||||
json_serializable=json_serializable,
|
||||
)
|
||||
|
||||
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
|
||||
"""
|
||||
Receive and process a payload.
|
||||
|
||||
:param payload: The payload to be processed.
|
||||
:type payload: Any
|
||||
:param session_id: The session ID associated with the payload.
|
||||
:type session_id: str
|
||||
|
||||
:return: True if the payload was successfully processed, False otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
if isinstance(payload, PortScanPayload):
|
||||
if payload.request:
|
||||
self._process_port_scan_request(payload=payload, session_id=session_id)
|
||||
else:
|
||||
self._process_port_scan_response(payload=payload)
|
||||
|
||||
return True
|
||||
@@ -78,6 +78,31 @@ class SoftwareManager:
|
||||
open_ports.append(software.port)
|
||||
return open_ports
|
||||
|
||||
def check_port_is_open(self, port: Port, protocol: IPProtocol) -> bool:
|
||||
"""
|
||||
Check if a specific port is open and running a service using the specified protocol.
|
||||
|
||||
This method iterates through all installed software on the node and checks if any of them
|
||||
are using the specified port and protocol and are currently in a running state. It returns True if any software
|
||||
is found running on the specified port and protocol, otherwise False.
|
||||
|
||||
|
||||
:param port: The port to check.
|
||||
:type port: Port
|
||||
:param protocol: The protocol to check (e.g., TCP, UDP).
|
||||
:type protocol: IPProtocol
|
||||
:return: True if the port is open and a service is running on it using the specified protocol, False otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
for software in self.software.values():
|
||||
if (
|
||||
software.port == port
|
||||
and software.protocol == protocol
|
||||
and software.operating_state in {ApplicationOperatingState.RUNNING, ServiceOperatingState.RUNNING}
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
def install(self, software_class: Type[IOSoftwareClass]):
|
||||
"""
|
||||
Install an Application or Service.
|
||||
@@ -150,6 +175,7 @@ class SoftwareManager:
|
||||
self,
|
||||
payload: Any,
|
||||
dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
|
||||
src_port: Optional[Port] = None,
|
||||
dest_port: Optional[Port] = None,
|
||||
ip_protocol: IPProtocol = IPProtocol.TCP,
|
||||
session_id: Optional[str] = None,
|
||||
@@ -170,6 +196,7 @@ class SoftwareManager:
|
||||
return self.session_manager.receive_payload_from_software_manager(
|
||||
payload=payload,
|
||||
dst_ip_address=dest_ip_address,
|
||||
src_port=src_port,
|
||||
dst_port=dest_port,
|
||||
ip_protocol=ip_protocol,
|
||||
session_id=session_id,
|
||||
@@ -190,6 +217,9 @@ class SoftwareManager:
|
||||
:param payload: The payload being received.
|
||||
:param session: The transport session the payload originates from.
|
||||
"""
|
||||
if payload.__class__.__name__ == "PortScanPayload":
|
||||
self.software.get("NMAP").receive(payload=payload, session_id=session_id)
|
||||
return
|
||||
receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None)
|
||||
if receiver:
|
||||
receiver.receive(
|
||||
|
||||
25
src/primaite/utils/converters.py
Normal file
25
src/primaite/utils/converters.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict
|
||||
|
||||
|
||||
def convert_dict_enum_keys_to_enum_values(d: Dict[Any, Any]) -> Dict[Any, Any]:
|
||||
"""
|
||||
Convert dictionary keys from enums to their corresponding values.
|
||||
|
||||
:param d: dict
|
||||
The dictionary with enum keys to be converted.
|
||||
:return: dict
|
||||
The dictionary with enum values as keys.
|
||||
"""
|
||||
result = {}
|
||||
for key, value in d.items():
|
||||
if isinstance(key, Enum):
|
||||
new_key = key.value
|
||||
else:
|
||||
new_key = key
|
||||
|
||||
if isinstance(value, dict):
|
||||
result[new_key] = convert_dict_enum_keys_to_enum_values(value)
|
||||
else:
|
||||
result[new_key] = value
|
||||
return result
|
||||
@@ -0,0 +1,137 @@
|
||||
io_settings:
|
||||
save_step_metadata: false
|
||||
save_pcap_logs: true
|
||||
save_sys_logs: true
|
||||
sys_log_level: WARNING
|
||||
|
||||
|
||||
|
||||
game:
|
||||
max_episode_length: 256
|
||||
ports:
|
||||
- ARP
|
||||
- DNS
|
||||
- HTTP
|
||||
- POSTGRES_SERVER
|
||||
protocols:
|
||||
- ICMP
|
||||
- TCP
|
||||
- UDP
|
||||
|
||||
agents:
|
||||
- ref: client_1_red_nmap
|
||||
team: RED
|
||||
type: ProbabilisticAgent
|
||||
observation_space: null
|
||||
action_space:
|
||||
options:
|
||||
nodes:
|
||||
- node_name: client_1
|
||||
applications:
|
||||
- application_name: NMAP
|
||||
max_folders_per_node: 1
|
||||
max_files_per_folder: 1
|
||||
max_services_per_node: 1
|
||||
max_applications_per_node: 1
|
||||
action_list:
|
||||
- type: NODE_NMAP_NETWORK_SERVICE_RECON
|
||||
action_map:
|
||||
0:
|
||||
action: NODE_NMAP_NETWORK_SERVICE_RECON
|
||||
options:
|
||||
source_node: client_1
|
||||
target_ip_address: 192.168.10.0/24
|
||||
target_port: 80
|
||||
target_protocol: tcp
|
||||
|
||||
reward_function:
|
||||
reward_components:
|
||||
- type: DUMMY
|
||||
|
||||
agent_settings:
|
||||
action_probabilities:
|
||||
0: 1.0
|
||||
|
||||
|
||||
|
||||
simulation:
|
||||
network:
|
||||
nodes:
|
||||
- hostname: switch_1
|
||||
num_ports: 8
|
||||
type: switch
|
||||
|
||||
- hostname: switch_2
|
||||
num_ports: 8
|
||||
type: switch
|
||||
|
||||
- hostname: router_1
|
||||
type: router
|
||||
ports:
|
||||
1:
|
||||
ip_address: 192.168.1.1
|
||||
subnet_mask: 255.255.255.0
|
||||
2:
|
||||
ip_address: 192.168.10.1
|
||||
subnet_mask: 255.255.255.0
|
||||
acl:
|
||||
1:
|
||||
action: PERMIT
|
||||
|
||||
- hostname: client_1
|
||||
type: computer
|
||||
ip_address: 192.168.10.21
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.10.1
|
||||
|
||||
- hostname: client_2
|
||||
type: computer
|
||||
ip_address: 192.168.10.22
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.10.1
|
||||
|
||||
- hostname: server_1
|
||||
type: server
|
||||
ip_address: 192.168.1.10
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
|
||||
- hostname: server_2
|
||||
type: server
|
||||
ip_address: 192.168.1.14
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
|
||||
|
||||
|
||||
|
||||
links:
|
||||
- endpoint_a_hostname: router_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 8
|
||||
|
||||
- endpoint_a_hostname: router_1
|
||||
endpoint_a_port: 2
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 8
|
||||
|
||||
- endpoint_a_hostname: client_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 1
|
||||
|
||||
- endpoint_a_hostname: client_2
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 2
|
||||
|
||||
- endpoint_a_hostname: server_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 1
|
||||
|
||||
- endpoint_a_hostname: server_2
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 2
|
||||
135
tests/assets/configs/nmap_ping_scan_red_agent_config.yaml
Normal file
135
tests/assets/configs/nmap_ping_scan_red_agent_config.yaml
Normal file
@@ -0,0 +1,135 @@
|
||||
io_settings:
|
||||
save_step_metadata: false
|
||||
save_pcap_logs: true
|
||||
save_sys_logs: true
|
||||
sys_log_level: WARNING
|
||||
|
||||
|
||||
|
||||
game:
|
||||
max_episode_length: 256
|
||||
ports:
|
||||
- ARP
|
||||
- DNS
|
||||
- HTTP
|
||||
- POSTGRES_SERVER
|
||||
protocols:
|
||||
- ICMP
|
||||
- TCP
|
||||
- UDP
|
||||
|
||||
agents:
|
||||
- ref: client_1_red_nmap
|
||||
team: RED
|
||||
type: ProbabilisticAgent
|
||||
observation_space: null
|
||||
action_space:
|
||||
options:
|
||||
nodes:
|
||||
- node_name: client_1
|
||||
applications:
|
||||
- application_name: NMAP
|
||||
max_folders_per_node: 1
|
||||
max_files_per_folder: 1
|
||||
max_services_per_node: 1
|
||||
max_applications_per_node: 1
|
||||
action_list:
|
||||
- type: NODE_NMAP_PING_SCAN
|
||||
action_map:
|
||||
0:
|
||||
action: NODE_NMAP_PING_SCAN
|
||||
options:
|
||||
source_node: client_1
|
||||
target_ip_address: 192.168.1.0/24
|
||||
|
||||
reward_function:
|
||||
reward_components:
|
||||
- type: DUMMY
|
||||
|
||||
agent_settings:
|
||||
action_probabilities:
|
||||
0: 1.0
|
||||
|
||||
|
||||
|
||||
simulation:
|
||||
network:
|
||||
nodes:
|
||||
- hostname: switch_1
|
||||
num_ports: 8
|
||||
type: switch
|
||||
|
||||
- hostname: switch_2
|
||||
num_ports: 8
|
||||
type: switch
|
||||
|
||||
- hostname: router_1
|
||||
type: router
|
||||
ports:
|
||||
1:
|
||||
ip_address: 192.168.1.1
|
||||
subnet_mask: 255.255.255.0
|
||||
2:
|
||||
ip_address: 192.168.10.1
|
||||
subnet_mask: 255.255.255.0
|
||||
acl:
|
||||
1:
|
||||
action: PERMIT
|
||||
|
||||
- hostname: client_1
|
||||
type: computer
|
||||
ip_address: 192.168.10.21
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.10.1
|
||||
|
||||
- hostname: client_2
|
||||
type: computer
|
||||
ip_address: 192.168.10.22
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.10.1
|
||||
|
||||
- hostname: server_1
|
||||
type: server
|
||||
ip_address: 192.168.1.10
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
|
||||
- hostname: server_2
|
||||
type: server
|
||||
ip_address: 192.168.1.14
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
|
||||
|
||||
|
||||
|
||||
links:
|
||||
- endpoint_a_hostname: router_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 8
|
||||
|
||||
- endpoint_a_hostname: router_1
|
||||
endpoint_a_port: 2
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 8
|
||||
|
||||
- endpoint_a_hostname: client_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 1
|
||||
|
||||
- endpoint_a_hostname: client_2
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 2
|
||||
|
||||
- endpoint_a_hostname: server_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 1
|
||||
|
||||
- endpoint_a_hostname: server_2
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 2
|
||||
135
tests/assets/configs/nmap_port_scan_red_agent_config.yaml
Normal file
135
tests/assets/configs/nmap_port_scan_red_agent_config.yaml
Normal file
@@ -0,0 +1,135 @@
|
||||
io_settings:
|
||||
save_step_metadata: false
|
||||
save_pcap_logs: true
|
||||
save_sys_logs: true
|
||||
sys_log_level: WARNING
|
||||
|
||||
|
||||
|
||||
game:
|
||||
max_episode_length: 256
|
||||
ports:
|
||||
- ARP
|
||||
- DNS
|
||||
- HTTP
|
||||
- POSTGRES_SERVER
|
||||
protocols:
|
||||
- ICMP
|
||||
- TCP
|
||||
- UDP
|
||||
|
||||
agents:
|
||||
- ref: client_1_red_nmap
|
||||
team: RED
|
||||
type: ProbabilisticAgent
|
||||
observation_space: null
|
||||
action_space:
|
||||
options:
|
||||
nodes:
|
||||
- node_name: client_1
|
||||
applications:
|
||||
- application_name: NMAP
|
||||
max_folders_per_node: 1
|
||||
max_files_per_folder: 1
|
||||
max_services_per_node: 1
|
||||
max_applications_per_node: 1
|
||||
action_list:
|
||||
- type: NODE_NMAP_PORT_SCAN
|
||||
action_map:
|
||||
0:
|
||||
action: NODE_NMAP_PORT_SCAN
|
||||
options:
|
||||
source_node: client_1
|
||||
target_ip_address: 192.168.10.0/24
|
||||
|
||||
reward_function:
|
||||
reward_components:
|
||||
- type: DUMMY
|
||||
|
||||
agent_settings:
|
||||
action_probabilities:
|
||||
0: 1.0
|
||||
|
||||
|
||||
|
||||
simulation:
|
||||
network:
|
||||
nodes:
|
||||
- hostname: switch_1
|
||||
num_ports: 8
|
||||
type: switch
|
||||
|
||||
- hostname: switch_2
|
||||
num_ports: 8
|
||||
type: switch
|
||||
|
||||
- hostname: router_1
|
||||
type: router
|
||||
ports:
|
||||
1:
|
||||
ip_address: 192.168.1.1
|
||||
subnet_mask: 255.255.255.0
|
||||
2:
|
||||
ip_address: 192.168.10.1
|
||||
subnet_mask: 255.255.255.0
|
||||
acl:
|
||||
1:
|
||||
action: PERMIT
|
||||
|
||||
- hostname: client_1
|
||||
type: computer
|
||||
ip_address: 192.168.10.21
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.10.1
|
||||
|
||||
- hostname: client_2
|
||||
type: computer
|
||||
ip_address: 192.168.10.22
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.10.1
|
||||
|
||||
- hostname: server_1
|
||||
type: server
|
||||
ip_address: 192.168.1.10
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
|
||||
- hostname: server_2
|
||||
type: server
|
||||
ip_address: 192.168.1.14
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
|
||||
|
||||
|
||||
|
||||
links:
|
||||
- endpoint_a_hostname: router_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 8
|
||||
|
||||
- endpoint_a_hostname: router_1
|
||||
endpoint_a_port: 2
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 8
|
||||
|
||||
- endpoint_a_hostname: client_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 1
|
||||
|
||||
- endpoint_a_hostname: client_2
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_2
|
||||
endpoint_b_port: 2
|
||||
|
||||
- endpoint_a_hostname: server_1
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 1
|
||||
|
||||
- endpoint_a_hostname: server_2
|
||||
endpoint_a_port: 1
|
||||
endpoint_b_hostname: switch_1
|
||||
endpoint_b_port: 2
|
||||
185
tests/integration_tests/system/test_nmap.py
Normal file
185
tests/integration_tests/system/test_nmap.py
Normal file
@@ -0,0 +1,185 @@
|
||||
from enum import Enum
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
|
||||
import yaml
|
||||
|
||||
from primaite.game.game import PrimaiteGame
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.nmap import NMAP
|
||||
from tests import TEST_ASSETS_ROOT
|
||||
|
||||
|
||||
def test_ping_scan_all_on(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
expected_result = [IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")]
|
||||
actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"])
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
|
||||
|
||||
def test_ping_scan_all_on_full_network(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")]
|
||||
actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24"))
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
|
||||
|
||||
def test_ping_scan_some_on(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
network.get_node_by_hostname("server_2").power_off()
|
||||
|
||||
expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10")]
|
||||
actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24"))
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
|
||||
|
||||
def test_ping_scan_all_off(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
network.get_node_by_hostname("server_1").power_off()
|
||||
network.get_node_by_hostname("server_2").power_off()
|
||||
|
||||
expected_result = []
|
||||
actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"])
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
|
||||
|
||||
def test_port_scan_one_node_one_port(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
client_2 = network.get_node_by_hostname("client_2")
|
||||
|
||||
actual_result = client_1_nmap.port_scan(
|
||||
target_ip_address=client_2.network_interface[1].ip_address, target_port=Port.DNS, target_protocol=IPProtocol.TCP
|
||||
)
|
||||
|
||||
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.DNS]}}
|
||||
|
||||
assert actual_result == expected_result
|
||||
|
||||
|
||||
def sort_dict(d):
|
||||
"""Recursively sorts a dictionary."""
|
||||
if isinstance(d, dict):
|
||||
return {k: sort_dict(v) for k, v in sorted(d.items(), key=lambda item: str(item[0]))}
|
||||
elif isinstance(d, list):
|
||||
return sorted(d, key=lambda item: str(item) if isinstance(item, Enum) else item)
|
||||
elif isinstance(d, Enum):
|
||||
return str(d)
|
||||
else:
|
||||
return d
|
||||
|
||||
|
||||
def test_port_scan_full_subnet_all_ports_and_protocols(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
actual_result = client_1_nmap.port_scan(
|
||||
target_ip_address=IPv4Network("192.168.10.0/24"),
|
||||
)
|
||||
|
||||
expected_result = {
|
||||
IPv4Address("192.168.10.1"): {IPProtocol.UDP: [Port.ARP]},
|
||||
IPv4Address("192.168.10.22"): {
|
||||
IPProtocol.TCP: [Port.HTTP, Port.FTP, Port.DNS],
|
||||
IPProtocol.UDP: [Port.ARP, Port.NTP],
|
||||
},
|
||||
}
|
||||
|
||||
assert sort_dict(actual_result) == sort_dict(expected_result)
|
||||
|
||||
|
||||
def test_network_service_recon_all_ports_and_protocols(example_network):
|
||||
network = example_network
|
||||
|
||||
client_1 = network.get_node_by_hostname("client_1")
|
||||
client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa
|
||||
|
||||
actual_result = client_1_nmap.network_service_recon(
|
||||
target_ip_address=IPv4Network("192.168.10.0/24"), target_port=Port.HTTP, target_protocol=IPProtocol.TCP
|
||||
)
|
||||
|
||||
expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.HTTP]}}
|
||||
|
||||
assert sort_dict(actual_result) == sort_dict(expected_result)
|
||||
|
||||
|
||||
def test_ping_scan_red_agent():
|
||||
with open(TEST_ASSETS_ROOT / "configs/nmap_ping_scan_red_agent_config.yaml", "r") as file:
|
||||
cfg = yaml.safe_load(file)
|
||||
|
||||
game = PrimaiteGame.from_config(cfg)
|
||||
|
||||
game.step()
|
||||
|
||||
expected_result = ["192.168.1.1", "192.168.1.10", "192.168.1.14"]
|
||||
|
||||
action_history = game.agents["client_1_red_nmap"].history
|
||||
assert len(action_history) == 1
|
||||
actual_result = action_history[0].response.data["live_hosts"]
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
|
||||
|
||||
def test_port_scan_red_agent():
|
||||
with open(TEST_ASSETS_ROOT / "configs/nmap_port_scan_red_agent_config.yaml", "r") as file:
|
||||
cfg = yaml.safe_load(file)
|
||||
|
||||
game = PrimaiteGame.from_config(cfg)
|
||||
|
||||
game.step()
|
||||
|
||||
expected_result = {
|
||||
"192.168.10.1": {"udp": [219]},
|
||||
"192.168.10.22": {
|
||||
"tcp": [80, 21, 53],
|
||||
"udp": [219, 123],
|
||||
},
|
||||
}
|
||||
|
||||
action_history = game.agents["client_1_red_nmap"].history
|
||||
assert len(action_history) == 1
|
||||
actual_result = action_history[0].response.data
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
|
||||
|
||||
def test_network_service_recon_red_agent():
|
||||
with open(TEST_ASSETS_ROOT / "configs/nmap_network_service_recon_red_agent_config.yaml", "r") as file:
|
||||
cfg = yaml.safe_load(file)
|
||||
|
||||
game = PrimaiteGame.from_config(cfg)
|
||||
|
||||
game.step()
|
||||
|
||||
expected_result = {"192.168.10.22": {"tcp": [80]}}
|
||||
|
||||
action_history = game.agents["client_1_red_nmap"].history
|
||||
assert len(action_history) == 1
|
||||
actual_result = action_history[0].response.data
|
||||
|
||||
assert sorted(actual_result) == sorted(expected_result)
|
||||
0
tests/unit_tests/_primaite/_utils/__init__.py
Normal file
0
tests/unit_tests/_primaite/_utils/__init__.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.utils.converters import convert_dict_enum_keys_to_enum_values
|
||||
|
||||
|
||||
def test_simple_conversion():
|
||||
"""
|
||||
Test conversion of a simple dictionary with enum keys to enum values.
|
||||
|
||||
The original dictionary contains one level of nested dictionary with enums as keys.
|
||||
The expected output should have string values of enums as keys.
|
||||
"""
|
||||
original_dict = {IPProtocol.UDP: {Port.ARP: {"inbound": 0, "outbound": 1016.0}}}
|
||||
expected_dict = {"udp": {219: {"inbound": 0, "outbound": 1016.0}}}
|
||||
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
|
||||
|
||||
|
||||
def test_no_enums():
|
||||
"""
|
||||
Test conversion of a dictionary with no enum keys.
|
||||
|
||||
The original dictionary contains only string keys.
|
||||
The expected output should be identical to the original dictionary.
|
||||
"""
|
||||
original_dict = {"protocol": {"port": {"inbound": 0, "outbound": 1016.0}}}
|
||||
expected_dict = {"protocol": {"port": {"inbound": 0, "outbound": 1016.0}}}
|
||||
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
|
||||
|
||||
|
||||
def test_mixed_keys():
|
||||
"""
|
||||
Test conversion of a dictionary with a mix of enum and string keys.
|
||||
|
||||
The original dictionary contains both enums and strings as keys.
|
||||
The expected output should have string values of enums and original string keys.
|
||||
"""
|
||||
original_dict = {
|
||||
IPProtocol.TCP: {"port": {"inbound": 0, "outbound": 1016.0}},
|
||||
"protocol": {Port.HTTP: {"inbound": 10, "outbound": 2020.0}},
|
||||
}
|
||||
expected_dict = {
|
||||
"tcp": {"port": {"inbound": 0, "outbound": 1016.0}},
|
||||
"protocol": {80: {"inbound": 10, "outbound": 2020.0}},
|
||||
}
|
||||
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
|
||||
|
||||
|
||||
def test_empty_dict():
|
||||
"""
|
||||
Test conversion of an empty dictionary.
|
||||
|
||||
The original dictionary is empty.
|
||||
The expected output should also be an empty dictionary.
|
||||
"""
|
||||
original_dict = {}
|
||||
expected_dict = {}
|
||||
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
|
||||
|
||||
|
||||
def test_nested_dicts():
|
||||
"""
|
||||
Test conversion of a nested dictionary with multiple levels of nested dictionaries and enums as keys.
|
||||
|
||||
The original dictionary contains nested dictionaries with enums as keys at different levels.
|
||||
The expected output should have string values of enums as keys at all levels.
|
||||
"""
|
||||
original_dict = {
|
||||
IPProtocol.UDP: {Port.ARP: {"inbound": 0, "outbound": 1016.0, "details": {IPProtocol.TCP: {"latency": "low"}}}}
|
||||
}
|
||||
expected_dict = {"udp": {219: {"inbound": 0, "outbound": 1016.0, "details": {"tcp": {"latency": "low"}}}}}
|
||||
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
|
||||
|
||||
|
||||
def test_non_dict_values():
|
||||
"""
|
||||
Test conversion of a dictionary where some values are not dictionaries.
|
||||
|
||||
The original dictionary contains lists and tuples as values.
|
||||
The expected output should preserve these non-dictionary values while converting enum keys to string values.
|
||||
"""
|
||||
original_dict = {IPProtocol.UDP: [Port.ARP, Port.HTTP], "protocols": (IPProtocol.TCP, IPProtocol.UDP)}
|
||||
expected_dict = {"udp": [Port.ARP, Port.HTTP], "protocols": (IPProtocol.TCP, IPProtocol.UDP)}
|
||||
assert convert_dict_enum_keys_to_enum_values(original_dict) == expected_dict
|
||||
Reference in New Issue
Block a user