#2628 - added _can_perform_network_action to nmap.py. made some changes following PR suggestions.

This commit is contained in:
Chris McCarthy
2024-06-04 22:29:00 +01:00
parent 5eea5bf4f9
commit 3c17ef0a69
4 changed files with 52 additions and 38 deletions

View File

@@ -10,9 +10,9 @@ NMAP
Overview Overview
-------- --------
The NMAP is used to simulate network scanning activities. NMAP is a powerful tool that helps in discovering hosts and The NMAP application is used to simulate network scanning activities. NMAP is a powerful tool that helps in discovering
services on a network. It provides functionalities such as ping scans to discover active hosts and port scans to detect hosts and services on a network. It provides functionalities such as ping scans to discover active hosts and port scans
open ports on those hosts. to detect open ports on those hosts.
The NMAP application is essential for network administrators and security professionals to map out a network's The NMAP application is essential for network administrators and security professionals to map out a network's
structure, identify active devices, and find potential vulnerabilities by discovering open ports and running services. structure, identify active devices, and find potential vulnerabilities by discovering open ports and running services.

View File

@@ -109,6 +109,7 @@ class NetworkInterface(SimComponent, ABC):
"A dict containing details of the number of malicious network events captured." "A dict containing details of the number of malicious network events captured."
traffic: Dict = Field(default_factory=lambda: {}) traffic: Dict = Field(default_factory=lambda: {})
"A dict containing details of the inbound and outbound traffic by port and protocol."
def setup_for_episode(self, episode: int): def setup_for_episode(self, episode: int):
"""Reset the original state of the SimComponent.""" """Reset the original state of the SimComponent."""

View File

@@ -315,7 +315,7 @@ class HostNode(Node):
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs): def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) # self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask))
@property @property
def nmap(self) -> Optional[NMAP]: def nmap(self) -> Optional[NMAP]:

View File

@@ -1,5 +1,5 @@
from ipaddress import IPv4Address, IPv4Network from ipaddress import IPv4Address, IPv4Network
from typing import Any, Dict, Final, List, Optional, Tuple, Union from typing import Any, Dict, Final, List, Optional, Set, Tuple, Union
from prettytable import PrettyTable from prettytable import PrettyTable
from pydantic import validate_call from pydantic import validate_call
@@ -19,6 +19,7 @@ class PortScanPayload(SimComponent):
:ivar ip_address: The target IP address for the port scan. :ivar ip_address: The target IP address for the port scan.
:ivar port: The target port for the port scan. :ivar port: The target port for the port scan.
:ivar protocol: The protocol used for the port scan. :ivar protocol: The protocol used for the port scan.
:ivar request:Flag to indicate whether this is a request or not.
""" """
ip_address: IPV4Address ip_address: IPV4Address
@@ -66,11 +67,27 @@ class NMAP(Application):
kwargs["protocol"] = IPProtocol.NONE kwargs["protocol"] = IPProtocol.NONE
super().__init__(**kwargs) super().__init__(**kwargs)
def _can_perform_network_action(self) -> bool:
"""
Checks if the NMAP application can perform outbound network actions.
This is done by checking the parent application can_per_action functionality. Then checking if there is an
enabled NIC that can be used for outbound traffic.
:return: True if outbound network actions can be performed, otherwise False.
"""
if not super()._can_perform_action():
return False
for nic in self.software_manager.node.network_interface.values():
if nic.enabled:
return True
return False
def _init_request_manager(self) -> RequestManager: def _init_request_manager(self) -> RequestManager:
def _ping_scan_action(request: List[Any], context: Any) -> RequestResponse: def _ping_scan_action(request: List[Any], context: Any) -> RequestResponse:
results = self.ping_scan(target_ip_address=request[0]["target_ip_address"], json_serializable=True) results = self.ping_scan(target_ip_address=request[0]["target_ip_address"], json_serializable=True)
success = True if not self._can_perform_network_action():
if not success:
return RequestResponse.from_bool(False) return RequestResponse.from_bool(False)
return RequestResponse( return RequestResponse(
status="success", status="success",
@@ -79,8 +96,7 @@ class NMAP(Application):
def _port_scan_action(request: List[Any], context: Any) -> RequestResponse: def _port_scan_action(request: List[Any], context: Any) -> RequestResponse:
results = self.port_scan(**request[0], json_serializable=True) results = self.port_scan(**request[0], json_serializable=True)
success = True if not self._can_perform_network_action():
if not success:
return RequestResponse.from_bool(False) return RequestResponse.from_bool(False)
return RequestResponse( return RequestResponse(
status="success", status="success",
@@ -89,8 +105,7 @@ class NMAP(Application):
def _network_service_recon_action(request: List[Any], context: Any) -> RequestResponse: def _network_service_recon_action(request: List[Any], context: Any) -> RequestResponse:
results = self.network_service_recon(**request[0], json_serializable=True) results = self.network_service_recon(**request[0], json_serializable=True)
success = True if not self._can_perform_network_action():
if not success:
return RequestResponse.from_bool(False) return RequestResponse.from_bool(False)
return RequestResponse( return RequestResponse(
status="success", status="success",
@@ -125,6 +140,24 @@ class NMAP(Application):
""" """
return super().describe_state() return super().describe_state()
@staticmethod
def _explode_ip_address_network_array(
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
) -> Set[IPv4Address]:
if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network):
target_ip_address = [target_ip_address]
ip_addresses: List[IPV4Address] = []
for ip_address in target_ip_address:
if isinstance(ip_address, IPv4Network):
ip_addresses += [
ip
for ip in ip_address.hosts()
if not ip == ip_address.broadcast_address and not ip == ip_address.network_address
]
else:
ip_addresses.append(ip_address)
return set(ip_addresses)
@validate_call() @validate_call()
def ping_scan( def ping_scan(
self, self,
@@ -154,19 +187,10 @@ class NMAP(Application):
table = PrettyTable(["IP Address", "Can Ping"]) table = PrettyTable(["IP Address", "Can Ping"])
table.align = "l" table.align = "l"
table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan" table.title = f"{self.software_manager.node.hostname} NMAP Ping Scan"
if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network):
target_ip_address = [target_ip_address] ip_addresses = self._explode_ip_address_network_array(target_ip_address)
ip_addresses = []
for ip_address in target_ip_address: for ip_address in ip_addresses:
if isinstance(ip_address, IPv4Network):
ip_addresses += [
ip
for ip in ip_address.hosts()
if not ip == ip_address.broadcast_address and not ip == ip_address.network_address
]
else:
ip_addresses.append(ip_address)
for ip_address in set(ip_addresses):
# Prevent ping scan on this node # Prevent ping scan on this node
if self.software_manager.node.ip_is_network_interface(ip_address=ip_address): if self.software_manager.node.ip_is_network_interface(ip_address=ip_address):
continue continue
@@ -305,18 +329,7 @@ class NMAP(Application):
:return: A dictionary mapping IP addresses to protocols and lists of open ports. :return: A dictionary mapping IP addresses to protocols and lists of open ports.
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]] :rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
""" """
if isinstance(target_ip_address, IPv4Address) or isinstance(target_ip_address, IPv4Network): ip_addresses = self._explode_ip_address_network_array(target_ip_address)
target_ip_address = [target_ip_address]
ip_addresses = []
for ip_address in target_ip_address:
if isinstance(ip_address, IPv4Network):
ip_addresses += [
ip
for ip in ip_address.hosts()
if not ip == ip_address.broadcast_address and not ip == ip_address.network_address
]
else:
ip_addresses.append(ip_address)
if isinstance(target_port, Port): if isinstance(target_port, Port):
target_port = [target_port] target_port = [target_port]
@@ -328,14 +341,14 @@ class NMAP(Application):
elif target_protocol is None: elif target_protocol is None:
target_protocol = [IPProtocol.TCP, IPProtocol.UDP] target_protocol = [IPProtocol.TCP, IPProtocol.UDP]
scan_type = self._determine_port_scan_type(target_ip_address, target_port) scan_type = self._determine_port_scan_type(list(ip_addresses), target_port)
active_ports = {} active_ports = {}
if show: if show:
table = PrettyTable(["IP Address", "Port", "Name", "Protocol"]) table = PrettyTable(["IP Address", "Port", "Name", "Protocol"])
table.align = "l" table.align = "l"
table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})" table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})"
self.sys_log.info(f"{self.name}: Starting port scan") self.sys_log.info(f"{self.name}: Starting port scan")
for ip_address in set(ip_addresses): for ip_address in ip_addresses:
# Prevent port scan on this node # Prevent port scan on this node
if self.software_manager.node.ip_is_network_interface(ip_address=ip_address): if self.software_manager.node.ip_is_network_interface(ip_address=ip_address):
continue continue