diff --git a/docs/source/simulation_components/system/applications/nmap.rst b/docs/source/simulation_components/system/applications/nmap.rst index 9a6a056d..ee7d24fc 100644 --- a/docs/source/simulation_components/system/applications/nmap.rst +++ b/docs/source/simulation_components/system/applications/nmap.rst @@ -1,6 +1,6 @@ .. only:: comment - © Crown-owned copyright 2023 - 2024, Defence Science and Technology Laboratory UK + © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK .. _NMAP: @@ -323,12 +323,11 @@ Perform a full box scan on all ports, over both TCP and UDP, on a whole subnet: } } - .. code-block:: text :caption: Box Port Scan Output +--------------------------------------------------+ - | pc_1 NMAP Port Scan (Box) | + | pc_1 NMAP Port Scan (Box) | +--------------+------+-----------------+----------+ | IP Address | Port | Name | Protocol | +--------------+------+-----------------+----------+ diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index 7707df2b..a3e61296 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -10,7 +10,7 @@ AbstractAction. The ActionManager is responsible for: """ import itertools from abc import ABC, abstractmethod -from typing import Dict, List, Optional, Tuple, TYPE_CHECKING +from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, Union from gymnasium import spaces @@ -870,6 +870,74 @@ class NetworkPortDisableAction(AbstractAction): return ["network", "node", target_nodename, "network_interface", port_id, "disable"] +class NodeNMAPPingScanAction(AbstractAction): + """Action which performs an NMAP ping scan.""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request(self, source_node: str, target_ip_address: Union[str, List[str]]) -> List[str]: # noqa + return [ + "network", + "node", + source_node, + "application", + "NMAP", + "ping_scan", + {"target_ip_address": target_ip_address}, + ] + + +class NodeNMAPPortScanAction(AbstractAction): + """Action which performs an NMAP port scan.""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request( + self, + source_node: str, + target_ip_address: Union[str, List[str]], + target_protocol: Optional[Union[str, List[str]]] = None, + target_port: Optional[Union[str, List[str]]] = None, + ) -> List[str]: # noqa + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + return [ + "network", + "node", + source_node, + "application", + "NMAP", + "port_scan", + {"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol}, + ] + + +class NodeNetworkServiceReconAction(AbstractAction): + """Action which performs an NMAP network service recon (ping scan followed by port scan).""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request( + self, + source_node: str, + target_ip_address: Union[str, List[str]], + target_protocol: Optional[Union[str, List[str]]] = None, + target_port: Optional[Union[str, List[str]]] = None, + ) -> List[str]: # noqa + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + return [ + "network", + "node", + source_node, + "application", + "NMAP", + "network_service_recon", + {"target_ip_address": target_ip_address, "target_port": target_port, "target_protocol": target_protocol}, + ] + + class ActionManager: """Class which manages the action space for an agent.""" @@ -915,6 +983,9 @@ class ActionManager: "HOST_NIC_DISABLE": HostNICDisableAction, "NETWORK_PORT_ENABLE": NetworkPortEnableAction, "NETWORK_PORT_DISABLE": NetworkPortDisableAction, + "NODE_NMAP_PING_SCAN": NodeNMAPPingScanAction, + "NODE_NMAP_PORT_SCAN": NodeNMAPPortScanAction, + "NODE_NMAP_NETWORK_SERVICE_RECON": NodeNetworkServiceReconAction, } """Dictionary which maps action type strings to the corresponding action class.""" diff --git a/src/primaite/interface/request.py b/src/primaite/interface/request.py index bc076599..cacb7cd0 100644 --- a/src/primaite/interface/request.py +++ b/src/primaite/interface/request.py @@ -2,7 +2,7 @@ from typing import Dict, ForwardRef, List, Literal, Union from pydantic import BaseModel, ConfigDict, StrictBool, validate_call -RequestFormat = List[Union[str, int, float]] +RequestFormat = List[Union[str, int, float, Dict]] RequestResponse = ForwardRef("RequestResponse") """This makes it possible to type-hint RequestResponse.from_bool return type.""" diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index 835f24fe..0e496592 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -221,7 +221,7 @@ class SimComponent(BaseModel): return state @validate_call - def apply_request(self, request: RequestFormat, context: Dict = {}) -> RequestResponse: + def apply_request(self, request: RequestFormat, context: Optional[Dict] = None) -> RequestResponse: """ Apply a request to a simulation component. Request data is passed in as a 'namespaced' list of strings. @@ -239,6 +239,8 @@ class SimComponent(BaseModel): :param: context: Dict containing context for requests :type context: Dict """ + if not context: + context = None if self._request_manager is None: return return self._request_manager(request, context) diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index a515ce58..6ab1b9fc 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -28,6 +28,7 @@ from primaite.simulator.network.nmne import ( NMNE_CAPTURE_KEYWORDS, ) from primaite.simulator.network.transmission.data_link_layer import Frame +from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.system.applications.application import Application from primaite.simulator.system.core.packet_capture import PacketCapture from primaite.simulator.system.core.session_manager import SessionManager @@ -107,10 +108,13 @@ class NetworkInterface(SimComponent, ABC): nmne: Dict = Field(default_factory=lambda: {}) "A dict containing details of the number of malicious network events captured." + traffic: Dict = Field(default_factory=lambda: {}) + def setup_for_episode(self, episode: int): """Reset the original state of the SimComponent.""" super().setup_for_episode(episode=episode) self.nmne = {} + self.traffic = {} if episode and self.pcap and SIM_OUTPUT.save_pcap_logs: self.pcap.current_episode = episode self.pcap.setup_logger() @@ -146,6 +150,7 @@ class NetworkInterface(SimComponent, ABC): ) if CAPTURE_NMNE: state.update({"nmne": {k: v for k, v in self.nmne.items()}}) + state.update({"traffic": self.traffic}) return state @abstractmethod @@ -236,6 +241,47 @@ class NetworkInterface(SimComponent, ABC): # Increment a generic counter if keyword capturing is not enabled keyword_level["*"] = keyword_level.get("*", 0) + 1 + def _capture_traffic(self, frame: Frame, inbound: bool = True): + """ + Capture traffic statistics at the Network Interface. + + :param frame: The network frame containing the traffic data. + :type frame: Frame + :param inbound: Flag indicating if the traffic is inbound or outbound. Defaults to True. + :type inbound: bool + """ + # Determine the direction of the traffic + direction = "inbound" if inbound else "outbound" + + # Initialize protocol and port variables + protocol = None + port = None + + # Identify the protocol and port from the frame + if frame.tcp: + protocol = IPProtocol.TCP + port = frame.tcp.dst_port + elif frame.udp: + protocol = IPProtocol.UDP + port = frame.udp.dst_port + elif frame.icmp: + protocol = IPProtocol.ICMP + + # Ensure the protocol is in the capture dict + if protocol not in self.traffic: + self.traffic[protocol] = {} + + # Handle non-ICMP protocols that use ports + if protocol != IPProtocol.ICMP: + if port not in self.traffic[protocol]: + self.traffic[protocol][port] = {"inbound": 0, "outbound": 0} + self.traffic[protocol][port][direction] += frame.size + else: + # Handle ICMP protocol separately (ICMP does not use ports) + if not self.traffic[protocol]: + self.traffic[protocol] = {"inbound": 0, "outbound": 0} + self.traffic[protocol][direction] += frame.size + @abstractmethod def send_frame(self, frame: Frame) -> bool: """ @@ -245,6 +291,7 @@ class NetworkInterface(SimComponent, ABC): :return: A boolean indicating whether the frame was successfully sent. """ self._capture_nmne(frame, inbound=False) + self._capture_traffic(frame, inbound=False) @abstractmethod def receive_frame(self, frame: Frame) -> bool: @@ -255,6 +302,7 @@ class NetworkInterface(SimComponent, ABC): :return: A boolean indicating whether the frame was successfully received. """ self._capture_nmne(frame, inbound=True) + self._capture_traffic(frame, inbound=True) def __str__(self) -> str: """ @@ -766,6 +814,24 @@ class Node(SimComponent): self.session_manager.software_manager = self.software_manager self._install_system_software() + def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool: + """ + Checks if a given IP address belongs to any of the nodes interfaces. + + :param ip_address: The IP address to check. + :param enabled_only: If True, only considers enabled network interfaces. + :return: True if the IP address is assigned to one of the nodes interfaces; False otherwise. + """ + for network_interface in self.network_interface.values(): + if not hasattr(network_interface, "ip_address"): + continue + if network_interface.ip_address == ip_address: + if enabled_only: + return network_interface.enabled + else: + return True + return False + def setup_for_episode(self, episode: int): """Reset the original state of the SimComponent.""" super().setup_for_episode(episode=episode) diff --git a/src/primaite/simulator/network/hardware/nodes/host/host_node.py b/src/primaite/simulator/network/hardware/nodes/host/host_node.py index 25c65dfd..9f1ace0f 100644 --- a/src/primaite/simulator/network/hardware/nodes/host/host_node.py +++ b/src/primaite/simulator/network/hardware/nodes/host/host_node.py @@ -315,7 +315,17 @@ class HostNode(Node): def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs): super().__init__(**kwargs) - self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) + self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) # + + @property + def nmap(self) -> Optional[NMAP]: + """ + Return the NMAP application installed on the Node. + + :return: NMAP application installed on the Node. + :rtype: Optional[NMAP] + """ + return self.software_manager.software.get("NMAP") @property def arp(self) -> Optional[ARP]: diff --git a/src/primaite/simulator/system/applications/nmap.py b/src/primaite/simulator/system/applications/nmap.py index f73bf6a6..4dbd9c1f 100644 --- a/src/primaite/simulator/system/applications/nmap.py +++ b/src/primaite/simulator/system/applications/nmap.py @@ -4,7 +4,8 @@ from typing import Any, Dict, Final, List, Optional, Tuple, Union from prettytable import PrettyTable from pydantic import validate_call -from primaite.simulator.core import SimComponent +from primaite.interface.request import RequestResponse +from primaite.simulator.core import RequestManager, RequestType, SimComponent from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import Application @@ -65,6 +66,56 @@ class NMAP(Application): kwargs["protocol"] = IPProtocol.NONE super().__init__(**kwargs) + def _init_request_manager(self) -> RequestManager: + def _ping_scan_action(request: List[Any], context: Any) -> RequestResponse: + results = self.ping_scan(target_ip_address=request[0]["target_ip_address"], json_serializable=True) + success = True + if not success: + return RequestResponse.from_bool(False) + return RequestResponse( + status="success", + data={"live_hosts": results}, + ) + + def _port_scan_action(request: List[Any], context: Any) -> RequestResponse: + results = self.port_scan(**request[0], json_serializable=True) + success = True + if not success: + return RequestResponse.from_bool(False) + return RequestResponse( + status="success", + data=results, + ) + + def _network_service_recon_action(request: List[Any], context: Any) -> RequestResponse: + results = self.network_service_recon(**request[0], json_serializable=True) + success = True + if not success: + return RequestResponse.from_bool(False) + return RequestResponse( + status="success", + data=results, + ) + + rm = RequestManager() + + rm.add_request( + name="ping_scan", + request_type=RequestType(func=_ping_scan_action), + ) + + rm.add_request( + name="port_scan", + request_type=RequestType(func=_port_scan_action), + ) + + rm.add_request( + name="network_service_recon", + request_type=RequestType(func=_network_service_recon_action), + ) + + return rm + def describe_state(self) -> Dict: """ Describe the state of the NMAP application. @@ -80,7 +131,8 @@ class NMAP(Application): target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], show: bool = True, show_online_only: bool = True, - ) -> List[IPV4Address]: + json_serializable: bool = False, + ) -> Union[List[IPV4Address], List[str]]: """ Perform a ping scan on the target IP address(es). @@ -90,9 +142,12 @@ class NMAP(Application): :type show: bool :param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True. :type show_online_only: bool + :param json_serializable: Flag indicating whether the return value should be json serializable. Defaults to + False. + :type json_serializable: bool :return: A list of active IP addresses that responded to the ping. - :rtype: List[IPV4Address] + :rtype: Union[List[IPV4Address], List[str]] """ active_nodes = [] if show: @@ -112,9 +167,12 @@ class NMAP(Application): else: ip_addresses.append(ip_address) for ip_address in set(ip_addresses): + # Prevent ping scan on this node + if self.software_manager.node.ip_is_network_interface(ip_address=ip_address): + continue can_ping = self.software_manager.icmp.ping(ip_address) if can_ping: - active_nodes.append(ip_address) + active_nodes.append(ip_address if not json_serializable else str(ip_address)) if show and (can_ping or not show_online_only): table.add_row([ip_address, can_ping]) if show: @@ -227,6 +285,7 @@ class NMAP(Application): target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None, target_port: Optional[Union[Port, List[Port]]] = None, show: bool = True, + json_serializable: bool = False, ) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]: """ Perform a port scan on the target IP address(es). @@ -239,6 +298,9 @@ class NMAP(Application): :type target_port: Optional[Union[Port, List[Port]]] :param show: Flag indicating whether to display the scan results. Defaults to True. :type show: bool + :param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to + False. + :type json_serializable: bool :return: A dictionary mapping IP addresses to protocols and lists of open ports. :rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]] @@ -274,24 +336,75 @@ class NMAP(Application): table.title = f"{self.software_manager.node.hostname} NMAP Port Scan ({scan_type})" self.sys_log.info(f"{self.name}: Starting port scan") for ip_address in set(ip_addresses): + # Prevent port scan on this node + if self.software_manager.node.ip_is_network_interface(ip_address=ip_address): + continue for protocol in target_protocol: for port in set(target_port): port_open = self._check_port_open_on_ip_address(ip_address=ip_address, port=port, protocol=protocol) if port_open: table.add_row([ip_address, port.value, port.name, protocol.name]) - - if ip_address not in active_ports: - active_ports[ip_address] = dict() - if protocol not in active_ports[ip_address]: - active_ports[ip_address][protocol] = [] - active_ports[ip_address][protocol].append(port) + _ip_address = ip_address if not json_serializable else str(ip_address) + _protocol = protocol if not json_serializable else protocol.value + _port = port if not json_serializable else port.value + if _ip_address not in active_ports: + active_ports[_ip_address] = dict() + if _protocol not in active_ports[_ip_address]: + active_ports[_ip_address][_protocol] = [] + active_ports[_ip_address][_protocol].append(_port) if show: print(table.get_string(sortby="IP Address")) return active_ports + def network_service_recon( + self, + target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]], + target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None, + target_port: Optional[Union[Port, List[Port]]] = None, + show: bool = True, + show_online_only: bool = True, + json_serializable: bool = False, + ) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]: + """ + Perform a network service reconnaissance which includes a ping scan followed by a port scan. + + This method combines the functionalities of a ping scan and a port scan to provide a comprehensive + overview of the services on the network. It first identifies active hosts in the target IP range by performing + a ping scan. Once the active hosts are identified, it performs a port scan on these hosts to identify open + ports and running services. This two-step process ensures that the port scan is performed only on live hosts, + optimising the scanning process and providing accurate results. + + :param target_ip_address: The target IP address(es) or network(s) for the port scan. + :type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]] + :param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP. + :type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] + :param target_port: The port(s) to scan. Defaults to None, which includes all valid ports. + :type target_port: Optional[Union[Port, List[Port]]] + :param show: Flag indicating whether to display the scan results. Defaults to True. + :type show: bool + :param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True. + :type show_online_only: bool + :param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to + False. + :type json_serializable: bool + + :return: A dictionary mapping IP addresses to protocols and lists of open ports. + :rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]] + """ + ping_scan_results = self.ping_scan( + target_ip_address=target_ip_address, show=show, show_online_only=show_online_only, json_serializable=False + ) + return self.port_scan( + target_ip_address=ping_scan_results, + target_protocol=target_protocol, + target_port=target_port, + show=show, + json_serializable=json_serializable, + ) + def receive(self, payload: Any, session_id: str, **kwargs) -> bool: """ Receive and process a payload. diff --git a/tests/assets/configs/nmap_network_service_recon_red_agent_config.yaml b/tests/assets/configs/nmap_network_service_recon_red_agent_config.yaml new file mode 100644 index 00000000..c5508f13 --- /dev/null +++ b/tests/assets/configs/nmap_network_service_recon_red_agent_config.yaml @@ -0,0 +1,137 @@ +io_settings: + save_step_metadata: false + save_pcap_logs: true + save_sys_logs: true + sys_log_level: WARNING + + + +game: + max_episode_length: 256 + ports: + - ARP + - DNS + - HTTP + - POSTGRES_SERVER + protocols: + - ICMP + - TCP + - UDP + +agents: + - ref: client_1_red_nmap + team: RED + type: ProbabilisticAgent + observation_space: null + action_space: + options: + nodes: + - node_name: client_1 + applications: + - application_name: NMAP + max_folders_per_node: 1 + max_files_per_folder: 1 + max_services_per_node: 1 + max_applications_per_node: 1 + action_list: + - type: NODE_NMAP_NETWORK_SERVICE_RECON + action_map: + 0: + action: NODE_NMAP_NETWORK_SERVICE_RECON + options: + source_node: client_1 + target_ip_address: 192.168.10.0/24 + target_port: 80 + target_protocol: tcp + + reward_function: + reward_components: + - type: DUMMY + + agent_settings: + action_probabilities: + 0: 1.0 + + + +simulation: + network: + nodes: + - hostname: switch_1 + num_ports: 8 + type: switch + + - hostname: switch_2 + num_ports: 8 + type: switch + + - hostname: router_1 + type: router + ports: + 1: + ip_address: 192.168.1.1 + subnet_mask: 255.255.255.0 + 2: + ip_address: 192.168.10.1 + subnet_mask: 255.255.255.0 + acl: + 1: + action: PERMIT + + - hostname: client_1 + type: computer + ip_address: 192.168.10.21 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: client_2 + type: computer + ip_address: 192.168.10.22 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: server_1 + type: server + ip_address: 192.168.1.10 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + - hostname: server_2 + type: server + ip_address: 192.168.1.14 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + + + + links: + - endpoint_a_hostname: router_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 8 + + - endpoint_a_hostname: router_1 + endpoint_a_port: 2 + endpoint_b_hostname: switch_2 + endpoint_b_port: 8 + + - endpoint_a_hostname: client_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 1 + + - endpoint_a_hostname: client_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 2 + + - endpoint_a_hostname: server_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 1 + + - endpoint_a_hostname: server_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 2 diff --git a/tests/assets/configs/nmap_ping_scan_red_agent_config.yaml b/tests/assets/configs/nmap_ping_scan_red_agent_config.yaml new file mode 100644 index 00000000..33ba3d19 --- /dev/null +++ b/tests/assets/configs/nmap_ping_scan_red_agent_config.yaml @@ -0,0 +1,135 @@ +io_settings: + save_step_metadata: false + save_pcap_logs: true + save_sys_logs: true + sys_log_level: WARNING + + + +game: + max_episode_length: 256 + ports: + - ARP + - DNS + - HTTP + - POSTGRES_SERVER + protocols: + - ICMP + - TCP + - UDP + +agents: + - ref: client_1_red_nmap + team: RED + type: ProbabilisticAgent + observation_space: null + action_space: + options: + nodes: + - node_name: client_1 + applications: + - application_name: NMAP + max_folders_per_node: 1 + max_files_per_folder: 1 + max_services_per_node: 1 + max_applications_per_node: 1 + action_list: + - type: NODE_NMAP_PING_SCAN + action_map: + 0: + action: NODE_NMAP_PING_SCAN + options: + source_node: client_1 + target_ip_address: 192.168.1.0/24 + + reward_function: + reward_components: + - type: DUMMY + + agent_settings: + action_probabilities: + 0: 1.0 + + + +simulation: + network: + nodes: + - hostname: switch_1 + num_ports: 8 + type: switch + + - hostname: switch_2 + num_ports: 8 + type: switch + + - hostname: router_1 + type: router + ports: + 1: + ip_address: 192.168.1.1 + subnet_mask: 255.255.255.0 + 2: + ip_address: 192.168.10.1 + subnet_mask: 255.255.255.0 + acl: + 1: + action: PERMIT + + - hostname: client_1 + type: computer + ip_address: 192.168.10.21 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: client_2 + type: computer + ip_address: 192.168.10.22 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: server_1 + type: server + ip_address: 192.168.1.10 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + - hostname: server_2 + type: server + ip_address: 192.168.1.14 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + + + + links: + - endpoint_a_hostname: router_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 8 + + - endpoint_a_hostname: router_1 + endpoint_a_port: 2 + endpoint_b_hostname: switch_2 + endpoint_b_port: 8 + + - endpoint_a_hostname: client_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 1 + + - endpoint_a_hostname: client_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 2 + + - endpoint_a_hostname: server_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 1 + + - endpoint_a_hostname: server_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 2 diff --git a/tests/assets/configs/nmap_port_scan_red_agent_config.yaml b/tests/assets/configs/nmap_port_scan_red_agent_config.yaml new file mode 100644 index 00000000..08944ee5 --- /dev/null +++ b/tests/assets/configs/nmap_port_scan_red_agent_config.yaml @@ -0,0 +1,135 @@ +io_settings: + save_step_metadata: false + save_pcap_logs: true + save_sys_logs: true + sys_log_level: WARNING + + + +game: + max_episode_length: 256 + ports: + - ARP + - DNS + - HTTP + - POSTGRES_SERVER + protocols: + - ICMP + - TCP + - UDP + +agents: + - ref: client_1_red_nmap + team: RED + type: ProbabilisticAgent + observation_space: null + action_space: + options: + nodes: + - node_name: client_1 + applications: + - application_name: NMAP + max_folders_per_node: 1 + max_files_per_folder: 1 + max_services_per_node: 1 + max_applications_per_node: 1 + action_list: + - type: NODE_NMAP_PORT_SCAN + action_map: + 0: + action: NODE_NMAP_PORT_SCAN + options: + source_node: client_1 + target_ip_address: 192.168.10.0/24 + + reward_function: + reward_components: + - type: DUMMY + + agent_settings: + action_probabilities: + 0: 1.0 + + + +simulation: + network: + nodes: + - hostname: switch_1 + num_ports: 8 + type: switch + + - hostname: switch_2 + num_ports: 8 + type: switch + + - hostname: router_1 + type: router + ports: + 1: + ip_address: 192.168.1.1 + subnet_mask: 255.255.255.0 + 2: + ip_address: 192.168.10.1 + subnet_mask: 255.255.255.0 + acl: + 1: + action: PERMIT + + - hostname: client_1 + type: computer + ip_address: 192.168.10.21 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: client_2 + type: computer + ip_address: 192.168.10.22 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.10.1 + + - hostname: server_1 + type: server + ip_address: 192.168.1.10 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + - hostname: server_2 + type: server + ip_address: 192.168.1.14 + subnet_mask: 255.255.255.0 + default_gateway: 192.168.1.1 + + + + + links: + - endpoint_a_hostname: router_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 8 + + - endpoint_a_hostname: router_1 + endpoint_a_port: 2 + endpoint_b_hostname: switch_2 + endpoint_b_port: 8 + + - endpoint_a_hostname: client_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 1 + + - endpoint_a_hostname: client_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_2 + endpoint_b_port: 2 + + - endpoint_a_hostname: server_1 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 1 + + - endpoint_a_hostname: server_2 + endpoint_a_port: 1 + endpoint_b_hostname: switch_1 + endpoint_b_port: 2 diff --git a/tests/integration_tests/system/test_nmap.py b/tests/integration_tests/system/test_nmap.py index 39cfce98..7cad6a98 100644 --- a/tests/integration_tests/system/test_nmap.py +++ b/tests/integration_tests/system/test_nmap.py @@ -1,16 +1,19 @@ from enum import Enum from ipaddress import IPv4Address, IPv4Network +import yaml + +from primaite.game.game import PrimaiteGame from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.nmap import NMAP +from tests import TEST_ASSETS_ROOT def test_ping_scan_all_on(example_network): network = example_network client_1 = network.get_node_by_hostname("client_1") - client_1.software_manager.install(NMAP) client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa expected_result = [IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")] @@ -23,7 +26,6 @@ def test_ping_scan_all_on_full_network(example_network): network = example_network client_1 = network.get_node_by_hostname("client_1") - client_1.software_manager.install(NMAP) client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10"), IPv4Address("192.168.1.14")] @@ -36,13 +38,12 @@ def test_ping_scan_some_on(example_network): network = example_network client_1 = network.get_node_by_hostname("client_1") - client_1.software_manager.install(NMAP) client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa network.get_node_by_hostname("server_2").power_off() - expected_result = [IPv4Address("192.168.1.10")] - actual_result = client_1_nmap.ping_scan(target_ip_address=["192.168.1.10", "192.168.1.14"]) + expected_result = [IPv4Address("192.168.1.1"), IPv4Address("192.168.1.10")] + actual_result = client_1_nmap.ping_scan(target_ip_address=IPv4Network("192.168.1.0/24")) assert sorted(actual_result) == sorted(expected_result) @@ -51,7 +52,6 @@ def test_ping_scan_all_off(example_network): network = example_network client_1 = network.get_node_by_hostname("client_1") - client_1.software_manager.install(NMAP) client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa network.get_node_by_hostname("server_1").power_off() @@ -67,7 +67,6 @@ def test_port_scan_one_node_one_port(example_network): network = example_network client_1 = network.get_node_by_hostname("client_1") - client_1.software_manager.install(NMAP) client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa client_2 = network.get_node_by_hostname("client_2") @@ -97,7 +96,6 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network): network = example_network client_1 = network.get_node_by_hostname("client_1") - client_1.software_manager.install(NMAP) client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa actual_result = client_1_nmap.port_scan( @@ -105,7 +103,6 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network): ) expected_result = { - IPv4Address("192.168.10.21"): {IPProtocol.UDP: [Port.ARP]}, IPv4Address("192.168.10.1"): {IPProtocol.UDP: [Port.ARP]}, IPv4Address("192.168.10.22"): { IPProtocol.TCP: [Port.HTTP, Port.FTP, Port.DNS], @@ -114,3 +111,75 @@ def test_port_scan_full_subnet_all_ports_and_protocols(example_network): } assert sort_dict(actual_result) == sort_dict(expected_result) + + +def test_network_service_recon_all_ports_and_protocols(example_network): + network = example_network + + client_1 = network.get_node_by_hostname("client_1") + client_1_nmap: NMAP = client_1.software_manager.software["NMAP"] # noqa + + actual_result = client_1_nmap.network_service_recon( + target_ip_address=IPv4Network("192.168.10.0/24"), target_port=Port.HTTP, target_protocol=IPProtocol.TCP + ) + + expected_result = {IPv4Address("192.168.10.22"): {IPProtocol.TCP: [Port.HTTP]}} + + assert sort_dict(actual_result) == sort_dict(expected_result) + + +def test_ping_scan_red_agent(): + with open(TEST_ASSETS_ROOT / "configs/nmap_ping_scan_red_agent_config.yaml", "r") as file: + cfg = yaml.safe_load(file) + + game = PrimaiteGame.from_config(cfg) + + game.step() + + expected_result = ["192.168.1.1", "192.168.1.10", "192.168.1.14"] + + action_history = game.agents["client_1_red_nmap"].action_history + assert len(action_history) == 1 + actual_result = action_history[0].response.data["live_hosts"] + + assert sorted(actual_result) == sorted(expected_result) + + +def test_port_scan_red_agent(): + with open(TEST_ASSETS_ROOT / "configs/nmap_port_scan_red_agent_config.yaml", "r") as file: + cfg = yaml.safe_load(file) + + game = PrimaiteGame.from_config(cfg) + + game.step() + + expected_result = { + "192.168.10.1": {"udp": [219]}, + "192.168.10.22": { + "tcp": [80, 21, 53], + "udp": [219, 123], + }, + } + + action_history = game.agents["client_1_red_nmap"].action_history + assert len(action_history) == 1 + actual_result = action_history[0].response.data + + assert sorted(actual_result) == sorted(expected_result) + + +def test_network_service_recon_red_agent(): + with open(TEST_ASSETS_ROOT / "configs/nmap_network_service_recon_red_agent_config.yaml", "r") as file: + cfg = yaml.safe_load(file) + + game = PrimaiteGame.from_config(cfg) + + game.step() + + expected_result = {"192.168.10.22": {"tcp": [80]}} + + action_history = game.agents["client_1_red_nmap"].action_history + assert len(action_history) == 1 + actual_result = action_history[0].response.data + + assert sorted(actual_result) == sorted(expected_result)