From 87d9d6da044fab4a1a15182ad4632a6ca384cab2 Mon Sep 17 00:00:00 2001 From: Chris McCarthy Date: Fri, 2 Feb 2024 15:35:02 +0000 Subject: [PATCH] #2248 - Initial work has been done on moving ICMP into services. still tidying up to be done. Need to fix tests too. --- .../simulator/network/hardware/base.py | 174 ++---------------- .../simulator/network/hardware/nodes/host.py | 67 +++++++ .../network/hardware/nodes/router.py | 99 +--------- .../simulator/network/protocols/icmp.py | 114 ++++++++++++ .../network/transmission/data_link_layer.py | 3 +- .../network/transmission/network_layer.py | 104 ----------- .../network/transmission/transport_layer.py | 2 + .../simulator/system/core/session_manager.py | 9 +- .../simulator/system/core/software_manager.py | 15 +- src/primaite/simulator/system/core/sys_log.py | 25 ++- .../simulator/system/services/arp/arp.py | 7 +- .../system/services/icmp/__init__.py | 0 .../simulator/system/services/icmp/icmp.py | 159 ++++++++++++++++ .../system/services/icmp/router_icmp.py | 90 +++++++++ 14 files changed, 495 insertions(+), 373 deletions(-) create mode 100644 src/primaite/simulator/network/hardware/nodes/host.py create mode 100644 src/primaite/simulator/network/protocols/icmp.py create mode 100644 src/primaite/simulator/system/services/icmp/__init__.py create mode 100644 src/primaite/simulator/system/services/icmp/icmp.py create mode 100644 src/primaite/simulator/system/services/icmp/router_icmp.py diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 0113c2b4..7fbaa5f4 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -4,7 +4,7 @@ import re import secrets from ipaddress import IPv4Address, IPv4Network from pathlib import Path -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Union from prettytable import MARKDOWN, PrettyTable @@ -17,7 +17,7 @@ from primaite.simulator.file_system.file_system import FileSystem from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame -from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol +from primaite.simulator.network.transmission.network_layer import IPPacket from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader from primaite.simulator.system.applications.application import Application from primaite.simulator.system.core.packet_capture import PacketCapture @@ -854,113 +854,6 @@ class ARPCache: return item in self.arp -class ICMP: - """ - The ICMP (Internet Control Message Protocol) class. - - Provides functionalities for managing and handling ICMP packets, including echo requests and replies. - """ - - def __init__(self, sys_log: SysLog): - """ - Initialize the ICMP (Internet Control Message Protocol) service. - - :param sys_log: The system log to store system messages and information. - :param arp_cache: The ARP cache for resolving IP to MAC address mappings. - """ - self.sys_log: SysLog = sys_log - self.software_manager: SoftwareManager = None ## noqa - self.request_replies = {} - - def clear(self): - """Clears the ICMP request replies tracker.""" - self.request_replies.clear() - - def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False): - """ - Process an ICMP packet, including handling echo requests and replies. - - :param frame: The Frame containing the ICMP packet to process. - """ - if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: - if not is_reattempt: - self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}") - target_mac_address = self.software_manager.arp.get_arp_cache_mac_address(frame.ip.src_ip_address) - - src_nic = self.software_manager.arp.get_arp_cache_nic(frame.ip.src_ip_address) - if not src_nic: - self.software_manager.arp.send_arp_request(frame.ip.src_ip_address) - self.process_icmp(frame=frame, from_nic=from_nic, is_reattempt=True) - return - - # Network Layer - ip_packet = IPPacket( - src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP - ) - # Data Link Layer - ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) - icmp_reply_packet = ICMPPacket( - icmp_type=ICMPType.ECHO_REPLY, - icmp_code=0, - identifier=frame.icmp.identifier, - sequence=frame.icmp.sequence + 1, - ) - payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size - frame = Frame(ethernet=ethernet_header, ip=ip_packet, icmp=icmp_reply_packet, payload=payload) - self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}") - - src_nic.send_frame(frame) - elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY: - time = frame.transmission_duration() - time_str = f"{time}ms" if time > 0 else "<1ms" - self.sys_log.info( - f"Reply from {frame.ip.src_ip_address}: " - f"bytes={len(frame.payload)}, " - f"time={time_str}, " - f"TTL={frame.ip.ttl}" - ) - if not self.request_replies.get(frame.icmp.identifier): - self.request_replies[frame.icmp.identifier] = 0 - self.request_replies[frame.icmp.identifier] += 1 - - def ping( - self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None, pings: int = 4 - ) -> Tuple[int, Union[int, None]]: - """ - Send an ICMP echo request (ping) to a target IP address and manage the sequence and identifier. - - :param target_ip_address: The target IP address to send the ping. - :param sequence: The sequence number of the echo request. Defaults to 0. - :param identifier: An optional identifier for the ICMP packet. If None, a default will be used. - :return: A tuple containing the next sequence number and the identifier, or (0, None) if the target IP address - was not found in the ARP cache. - """ - nic = self.software_manager.arp.get_arp_cache_nic(target_ip_address) - - if not nic: - return pings, None - - # ARP entry exists - sequence += 1 - target_mac_address = self.software_manager.arp.get_arp_cache_mac_address(target_ip_address) - - src_nic = self.software_manager.arp.get_arp_cache_nic(target_ip_address) - tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) - - # Network Layer - ip_packet = IPPacket( - src_ip_address=nic.ip_address, - dst_ip_address=target_ip_address, - protocol=IPProtocol.ICMP, - ) - # Data Link Layer - ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) - icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence) - payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size - frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet, payload=payload) - nic.send_frame(frame) - return sequence, icmp_packet.identifier - class Node(SimComponent): """ @@ -999,7 +892,6 @@ class Node(SimComponent): root: Path "Root directory for simulation output." sys_log: SysLog - icmp: ICMP session_manager: SessionManager software_manager: SoftwareManager @@ -1042,8 +934,6 @@ class Node(SimComponent): kwargs["default_gateway"] = IPv4Address(kwargs["default_gateway"]) if not kwargs.get("sys_log"): kwargs["sys_log"] = SysLog(kwargs["hostname"]) - if not kwargs.get("icmp"): - kwargs["icmp"] = ICMP(sys_log=kwargs.get("sys_log")) if not kwargs.get("session_manager"): kwargs["session_manager"] = SessionManager(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp")) if not kwargs.get("root"): @@ -1059,7 +949,6 @@ class Node(SimComponent): dns_server=kwargs.get("dns_server"), ) super().__init__(**kwargs) - self.icmp.software_manager = self.software_manager self.session_manager.node = self self.session_manager.software_manager = self.software_manager self._install_system_software() @@ -1096,12 +985,6 @@ class Node(SimComponent): """Reset the original state of the SimComponent.""" super().reset_component_for_episode(episode) - # Reset ARP Cache - self.arp.clear() - - # Reset ICMP - self.icmp.clear() - # Reset Session Manager self.session_manager.clear() @@ -1436,35 +1319,9 @@ class Node(SimComponent): :param pings: The number of pings to attempt, default is 4. :return: True if the ping is successful, otherwise False. """ - if self.operating_state == NodeOperatingState.ON: - if not isinstance(target_ip_address, IPv4Address): - target_ip_address = IPv4Address(target_ip_address) - if target_ip_address.is_loopback: - self.sys_log.info("Pinging loopback address") - return any(nic.enabled for nic in self.nics.values()) - if self.operating_state == NodeOperatingState.ON: - output = f"Pinging {target_ip_address}:" - self.sys_log.info(output) - print(output) - sequence, identifier = 0, None - while sequence < pings: - sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier, pings) - request_replies = self.icmp.request_replies.get(identifier) - passed = request_replies == pings - if request_replies: - self.icmp.request_replies.pop(identifier) - else: - request_replies = 0 - output = ( - f"Ping statistics for {target_ip_address}: " - f"Packets: Sent = {pings}, " - f"Received = {request_replies}, " - f"Lost = {pings - request_replies} ({(pings - request_replies) / pings * 100}% loss)" - ) - self.sys_log.info(output) - print(output) - return passed - return False + if not isinstance(target_ip_address, IPv4Address): + target_ip_address = IPv4Address(target_ip_address) + return self.software_manager.icmp.ping(target_ip_address) def send_frame(self, frame: Frame): """ @@ -1492,22 +1349,23 @@ class Node(SimComponent): self.software_manager.arp.add_arp_cache_entry( ip_address=frame.ip.src_ip_address, mac_address=frame.ethernet.src_mac_addr, nic=from_nic ) - if frame.ip.protocol == IPProtocol.ICMP: - self.icmp.process_icmp(frame=frame, from_nic=from_nic) - return + # Check if the destination port is open on the Node dst_port = None if frame.tcp: dst_port = frame.tcp.dst_port elif frame.udp: dst_port = frame.udp.dst_port - if dst_port in self.software_manager.get_open_ports(): - # accept thr frame as the port is open + + accept_frame = False + if frame.icmp or dst_port in self.software_manager.get_open_ports(): + # accept the frame as the port is open or if it's an ICMP frame + accept_frame = True + + # TODO: add internal node firewall check here? + + if accept_frame: self.session_manager.receive_frame(frame, from_nic) - # if frame.tcp.src_port == Port.ARP: - # self.arp.process_arp_packet(from_nic=from_nic, arp_packet=frame.arp) - # else: - # self.session_manager.receive_frame(frame) else: # denied as port closed self.sys_log.info(f"Ignoring frame for port {frame.tcp.dst_port.value} from {frame.ip.src_ip_address}") @@ -1527,7 +1385,6 @@ class Node(SimComponent): self.services[service.uuid] = service service.parent = self service.install() # Perform any additional setup, such as creating files for this service on the node. - self.sys_log.info(f"Installed service {service.name}") self._service_request_manager.add_request(service.uuid, RequestType(func=service._request_manager)) def uninstall_service(self, service: Service) -> None: @@ -1559,7 +1416,6 @@ class Node(SimComponent): return self.applications[application.uuid] = application application.parent = self - self.sys_log.info(f"Installed application {application.name}") self._application_request_manager.add_request(application.uuid, RequestType(func=application._request_manager)) def uninstall_application(self, application: Application) -> None: diff --git a/src/primaite/simulator/network/hardware/nodes/host.py b/src/primaite/simulator/network/hardware/nodes/host.py new file mode 100644 index 00000000..f4fc1586 --- /dev/null +++ b/src/primaite/simulator/network/hardware/nodes/host.py @@ -0,0 +1,67 @@ +from primaite.simulator.network.hardware.base import NIC, Node +from primaite.simulator.system.applications.web_browser import WebBrowser +from primaite.simulator.system.services.arp.host_arp import HostARP +from primaite.simulator.system.services.dns.dns_client import DNSClient +from primaite.simulator.system.services.ftp.ftp_client import FTPClient +from primaite.simulator.system.services.icmp.icmp import ICMP +from primaite.simulator.system.services.ntp.ntp_client import NTPClient + + +class Host(Node): + """ + A basic Host class. + + Example: + >>> pc_a = Host( + hostname="pc_a", + ip_address="192.168.1.10", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1" + ) + >>> pc_a.power_on() + + Instances of computer come 'pre-packaged' with the following: + + * Core Functionality: + * ARP + * ICMP + * Packet Capture + * Sys Log + * Services: + * DNS Client + * FTP Client + * LDAP Client + * NTP Client + * Applications: + * Email Client + * Web Browser + * Processes: + * Placeholder + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.connect_nic(NIC(ip_address=kwargs["ip_address"], subnet_mask=kwargs["subnet_mask"])) + self._install_system_software() + + def _install_system_software(self): + """Install System Software - software that is usually provided with the OS.""" + # ARP Service + self.software_manager.install(HostARP) + + # ICMP Service + self.software_manager.install(ICMP) + + # DNS Client + self.software_manager.install(DNSClient) + + # FTP Client + self.software_manager.install(FTPClient) + + # NTP Client + self.software_manager.install(NTPClient) + + # Web Browser + self.software_manager.install(WebBrowser) + + super()._install_system_software() diff --git a/src/primaite/simulator/network/hardware/nodes/router.py b/src/primaite/simulator/network/hardware/nodes/router.py index ed9a30d4..53277d69 100644 --- a/src/primaite/simulator/network/hardware/nodes/router.py +++ b/src/primaite/simulator/network/hardware/nodes/router.py @@ -8,10 +8,10 @@ from typing import Dict, List, Optional, Tuple, Union from prettytable import MARKDOWN, PrettyTable from primaite.simulator.core import RequestManager, RequestType, SimComponent -from primaite.simulator.network.hardware.base import ARPCache, ICMP, NIC, Node +from primaite.simulator.network.hardware.base import ARPCache, NIC, Node from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame -from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol +from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader from primaite.simulator.system.core.sys_log import SysLog @@ -628,96 +628,6 @@ class RouterARPCache(ARPCache): # return -class RouterICMP(ICMP): - """ - A class to represent a router's Internet Control Message Protocol (ICMP) handler. - - :param sys_log: System log for logging network events and errors. - :type sys_log: SysLog - :param arp_cache: The ARP cache for resolving MAC addresses. - :type arp_cache: ARPCache - :param router: The router to which this ICMP handler belongs. - :type router: Router - """ - - router: Router - - def __init__(self, sys_log: SysLog, arp_cache: ARPCache, router: Router): - super().__init__(sys_log, arp_cache) - self.router = router - - def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False): - """ - Process incoming ICMP frames based on ICMP type. - - :param frame: The incoming frame to process. - :param from_nic: The network interface where the frame is coming from. - :param is_reattempt: Flag to indicate if the process is a reattempt. - """ - if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: - # determine if request is for router interface or whether it needs to be routed - - for nic in self.router.nics.values(): - if nic.ip_address == frame.ip.dst_ip_address: - if nic.enabled: - # reply to the request - if not is_reattempt: - self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}") - target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address) - src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address) - tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) - - # Network Layer - ip_packet = IPPacket( - src_ip_address=nic.ip_address, - dst_ip_address=frame.ip.src_ip_address, - protocol=IPProtocol.ICMP, - ) - # Data Link Layer - ethernet_header = EthernetHeader( - src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address - ) - icmp_reply_packet = ICMPPacket( - icmp_type=ICMPType.ECHO_REPLY, - icmp_code=0, - identifier=frame.icmp.identifier, - sequence=frame.icmp.sequence + 1, - ) - payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size - frame = Frame( - ethernet=ethernet_header, - ip=ip_packet, - tcp=tcp_header, - icmp=icmp_reply_packet, - payload=payload, - ) - self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}") - - src_nic.send_frame(frame) - return - - # Route the frame - self.router.process_frame(frame, from_nic) - - elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY: - for nic in self.router.nics.values(): - if nic.ip_address == frame.ip.dst_ip_address: - if nic.enabled: - time = frame.transmission_duration() - time_str = f"{time}ms" if time > 0 else "<1ms" - self.sys_log.info( - f"Reply from {frame.ip.src_ip_address}: " - f"bytes={len(frame.payload)}, " - f"time={time_str}, " - f"TTL={frame.ip.ttl}" - ) - if not self.request_replies.get(frame.icmp.identifier): - self.request_replies[frame.icmp.identifier] = 0 - self.request_replies[frame.icmp.identifier] += 1 - - return - # Route the frame - self.router.process_frame(frame, from_nic) class RouterNIC(NIC): @@ -786,9 +696,10 @@ class Router(Node): kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"]) if not kwargs.get("arp"): kwargs["arp"] = RouterARPCache(sys_log=kwargs.get("sys_log"), router=self) - if not kwargs.get("icmp"): - kwargs["icmp"] = RouterICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"), router=self) + # if not kwargs.get("icmp"): + # kwargs["icmp"] = RouterICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"), router=self) super().__init__(hostname=hostname, num_ports=num_ports, **kwargs) + # TODO: Install RoputerICMP for i in range(1, self.num_ports + 1): nic = RouterNIC(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0") self.connect_nic(nic) diff --git a/src/primaite/simulator/network/protocols/icmp.py b/src/primaite/simulator/network/protocols/icmp.py new file mode 100644 index 00000000..9f761393 --- /dev/null +++ b/src/primaite/simulator/network/protocols/icmp.py @@ -0,0 +1,114 @@ +import secrets +from enum import Enum +from typing import Union + +from pydantic import BaseModel, field_validator, validate_call +from pydantic_core.core_schema import FieldValidationInfo + +from primaite import getLogger + +_LOGGER = getLogger(__name__) + + +class ICMPType(Enum): + """Enumeration of common ICMP (Internet Control Message Protocol) types.""" + + ECHO_REPLY = 0 + "Echo Reply message." + DESTINATION_UNREACHABLE = 3 + "Destination Unreachable." + REDIRECT = 5 + "Redirect." + ECHO_REQUEST = 8 + "Echo Request (ping)." + ROUTER_ADVERTISEMENT = 10 + "Router Advertisement." + ROUTER_SOLICITATION = 11 + "Router discovery/selection/solicitation." + TIME_EXCEEDED = 11 + "Time Exceeded." + TIMESTAMP_REQUEST = 13 + "Timestamp Request." + TIMESTAMP_REPLY = 14 + "Timestamp Reply." + + +@validate_call +def get_icmp_type_code_description(icmp_type: ICMPType, icmp_code: int) -> Union[str, None]: + """ + Maps ICMPType and code pairings to their respective description. + + :param icmp_type: An ICMPType. + :param icmp_code: An icmp code. + :return: The icmp type and code pairing description if it exists, otherwise returns None. + """ + icmp_code_descriptions = { + ICMPType.ECHO_REPLY: {0: "Echo reply"}, + ICMPType.DESTINATION_UNREACHABLE: { + 0: "Destination network unreachable", + 1: "Destination host unreachable", + 2: "Destination protocol unreachable", + 3: "Destination port unreachable", + 4: "Fragmentation required", + 5: "Source route failed", + 6: "Destination network unknown", + 7: "Destination host unknown", + 8: "Source host isolated", + 9: "Network administratively prohibited", + 10: "Host administratively prohibited", + 11: "Network unreachable for ToS", + 12: "Host unreachable for ToS", + 13: "Communication administratively prohibited", + 14: "Host Precedence Violation", + 15: "Precedence cutoff in effect", + }, + ICMPType.REDIRECT: { + 0: "Redirect Datagram for the Network", + 1: "Redirect Datagram for the Host", + }, + ICMPType.ECHO_REQUEST: {0: "Echo request"}, + ICMPType.ROUTER_ADVERTISEMENT: {0: "Router Advertisement"}, + ICMPType.ROUTER_SOLICITATION: {0: "Router discovery/selection/solicitation"}, + ICMPType.TIME_EXCEEDED: {0: "TTL expired in transit", 1: "Fragment reassembly time exceeded"}, + ICMPType.TIMESTAMP_REQUEST: {0: "Timestamp Request"}, + ICMPType.TIMESTAMP_REPLY: {0: "Timestamp reply"}, + } + return icmp_code_descriptions[icmp_type].get(icmp_code) + + +class ICMPPacket(BaseModel): + """Models an ICMP Packet.""" + + icmp_type: ICMPType = ICMPType.ECHO_REQUEST + "ICMP Type." + icmp_code: int = 0 + "ICMP Code." + identifier: int + "ICMP identifier (16 bits randomly generated)." + sequence: int = 0 + "ICMP message sequence number." + + def __init__(self, **kwargs): + if not kwargs.get("identifier"): + kwargs["identifier"] = secrets.randbits(16) + super().__init__(**kwargs) + + @field_validator("icmp_code") # noqa + @classmethod + def _icmp_type_must_have_icmp_code(cls, v: int, info: FieldValidationInfo) -> int: + """Validates the icmp_type and icmp_code.""" + icmp_type = info.data["icmp_type"] + if get_icmp_type_code_description(icmp_type, v): + return v + msg = f"No Matching ICMP code for type:{icmp_type.name}, code:{v}" + _LOGGER.error(msg) + raise ValueError(msg) + + def code_description(self) -> str: + """The icmp_code description.""" + description = get_icmp_type_code_description(self.icmp_type, self.icmp_code) + if description: + return description + msg = f"No Matching ICMP code for type:{self.icmp_type.name}, code:{self.icmp_code}" + _LOGGER.error(msg) + raise ValueError(msg) \ No newline at end of file diff --git a/src/primaite/simulator/network/transmission/data_link_layer.py b/src/primaite/simulator/network/transmission/data_link_layer.py index 6a4e24d8..5c25df01 100644 --- a/src/primaite/simulator/network/transmission/data_link_layer.py +++ b/src/primaite/simulator/network/transmission/data_link_layer.py @@ -5,8 +5,9 @@ from pydantic import BaseModel from primaite import getLogger from primaite.simulator.network.protocols.arp import ARPPacket +from primaite.simulator.network.protocols.icmp import ICMPPacket from primaite.simulator.network.protocols.packet import DataPacket -from primaite.simulator.network.transmission.network_layer import ICMPPacket, IPPacket, IPProtocol +from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol from primaite.simulator.network.transmission.primaite_layer import PrimaiteHeader from primaite.simulator.network.transmission.transport_layer import TCPHeader, UDPHeader from primaite.simulator.network.utils import convert_bytes_to_megabits diff --git a/src/primaite/simulator/network/transmission/network_layer.py b/src/primaite/simulator/network/transmission/network_layer.py index fd36fbf8..b581becd 100644 --- a/src/primaite/simulator/network/transmission/network_layer.py +++ b/src/primaite/simulator/network/transmission/network_layer.py @@ -54,110 +54,6 @@ class Precedence(Enum): "Highest priority level, used for the most critical network control messages, such as routing protocol hellos." -class ICMPType(Enum): - """Enumeration of common ICMP (Internet Control Message Protocol) types.""" - - ECHO_REPLY = 0 - "Echo Reply message." - DESTINATION_UNREACHABLE = 3 - "Destination Unreachable." - REDIRECT = 5 - "Redirect." - ECHO_REQUEST = 8 - "Echo Request (ping)." - ROUTER_ADVERTISEMENT = 10 - "Router Advertisement." - ROUTER_SOLICITATION = 11 - "Router discovery/selection/solicitation." - TIME_EXCEEDED = 11 - "Time Exceeded." - TIMESTAMP_REQUEST = 13 - "Timestamp Request." - TIMESTAMP_REPLY = 14 - "Timestamp Reply." - - -@validate_call -def get_icmp_type_code_description(icmp_type: ICMPType, icmp_code: int) -> Union[str, None]: - """ - Maps ICMPType and code pairings to their respective description. - - :param icmp_type: An ICMPType. - :param icmp_code: An icmp code. - :return: The icmp type and code pairing description if it exists, otherwise returns None. - """ - icmp_code_descriptions = { - ICMPType.ECHO_REPLY: {0: "Echo reply"}, - ICMPType.DESTINATION_UNREACHABLE: { - 0: "Destination network unreachable", - 1: "Destination host unreachable", - 2: "Destination protocol unreachable", - 3: "Destination port unreachable", - 4: "Fragmentation required", - 5: "Source route failed", - 6: "Destination network unknown", - 7: "Destination host unknown", - 8: "Source host isolated", - 9: "Network administratively prohibited", - 10: "Host administratively prohibited", - 11: "Network unreachable for ToS", - 12: "Host unreachable for ToS", - 13: "Communication administratively prohibited", - 14: "Host Precedence Violation", - 15: "Precedence cutoff in effect", - }, - ICMPType.REDIRECT: { - 0: "Redirect Datagram for the Network", - 1: "Redirect Datagram for the Host", - }, - ICMPType.ECHO_REQUEST: {0: "Echo request"}, - ICMPType.ROUTER_ADVERTISEMENT: {0: "Router Advertisement"}, - ICMPType.ROUTER_SOLICITATION: {0: "Router discovery/selection/solicitation"}, - ICMPType.TIME_EXCEEDED: {0: "TTL expired in transit", 1: "Fragment reassembly time exceeded"}, - ICMPType.TIMESTAMP_REQUEST: {0: "Timestamp Request"}, - ICMPType.TIMESTAMP_REPLY: {0: "Timestamp reply"}, - } - return icmp_code_descriptions[icmp_type].get(icmp_code) - - -class ICMPPacket(BaseModel): - """Models an ICMP Packet.""" - - icmp_type: ICMPType = ICMPType.ECHO_REQUEST - "ICMP Type." - icmp_code: int = 0 - "ICMP Code." - identifier: int - "ICMP identifier (16 bits randomly generated)." - sequence: int = 0 - "ICMP message sequence number." - - def __init__(self, **kwargs): - if not kwargs.get("identifier"): - kwargs["identifier"] = secrets.randbits(16) - super().__init__(**kwargs) - - @field_validator("icmp_code") # noqa - @classmethod - def _icmp_type_must_have_icmp_code(cls, v: int, info: FieldValidationInfo) -> int: - """Validates the icmp_type and icmp_code.""" - icmp_type = info.data["icmp_type"] - if get_icmp_type_code_description(icmp_type, v): - return v - msg = f"No Matching ICMP code for type:{icmp_type.name}, code:{v}" - _LOGGER.error(msg) - raise ValueError(msg) - - def code_description(self) -> str: - """The icmp_code description.""" - description = get_icmp_type_code_description(self.icmp_type, self.icmp_code) - if description: - return description - msg = f"No Matching ICMP code for type:{self.icmp_type.name}, code:{self.icmp_code}" - _LOGGER.error(msg) - raise ValueError(msg) - - class IPPacket(BaseModel): """ Represents the IP layer of a network frame. diff --git a/src/primaite/simulator/network/transmission/transport_layer.py b/src/primaite/simulator/network/transmission/transport_layer.py index d4318baf..7c7509ab 100644 --- a/src/primaite/simulator/network/transmission/transport_layer.py +++ b/src/primaite/simulator/network/transmission/transport_layer.py @@ -7,6 +7,8 @@ from pydantic import BaseModel class Port(Enum): """Enumeration of common known TCP/UDP ports used by protocols for operation of network applications.""" + NONE = 0 + "Place holder for a non-port." WOL = 9 "Wake-on-Lan (WOL) - Used to turn or awaken a computer from sleep mode by a network message." FTP_DATA = 20 diff --git a/src/primaite/simulator/system/core/session_manager.py b/src/primaite/simulator/system/core/session_manager.py index 15001806..c134f56a 100644 --- a/src/primaite/simulator/system/core/session_manager.py +++ b/src/primaite/simulator/system/core/session_manager.py @@ -293,8 +293,15 @@ class SessionManager: dst_port = frame.tcp.dst_port elif frame.udp: dst_port = frame.udp.dst_port + elif frame.icmp: + dst_port = Port.NONE self.software_manager.receive_payload_from_session_manager( - payload=frame.payload, port=dst_port, protocol=frame.ip.protocol, session_id=session.uuid, from_nic=from_nic + payload=frame.payload, + port=dst_port, + protocol=frame.ip.protocol, + session_id=session.uuid, + from_nic=from_nic, + frame=frame ) def show(self, markdown: bool = False): diff --git a/src/primaite/simulator/system/core/software_manager.py b/src/primaite/simulator/system/core/software_manager.py index f23e2f55..ac765018 100644 --- a/src/primaite/simulator/system/core/software_manager.py +++ b/src/primaite/simulator/system/core/software_manager.py @@ -4,6 +4,7 @@ from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, Union from prettytable import MARKDOWN, PrettyTable from primaite.simulator.file_system.file_system import FileSystem +from primaite.simulator.network.transmission.data_link_layer import Frame from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import Application, ApplicationOperatingState @@ -16,6 +17,7 @@ if TYPE_CHECKING: from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.network.hardware.base import Node, NIC from primaite.simulator.system.services.arp.arp import ARP + from primaite.simulator.system.services.icmp.icmp import ICMP from typing import Type, TypeVar @@ -51,6 +53,10 @@ class SoftwareManager: def arp(self) -> 'ARP': return self.software.get("ARP") # noqa + @property + def icmp(self) -> 'ICMP': + return self.software.get("ICMP") # noqa + def get_open_ports(self) -> List[Port]: """ Get a list of open ports. @@ -160,7 +166,7 @@ class SoftwareManager: ) def receive_payload_from_session_manager( - self, payload: Any, port: Port, protocol: IPProtocol, session_id: str, from_nic: "NIC" + self, payload: Any, port: Port, protocol: IPProtocol, session_id: str, from_nic: "NIC", frame: Frame ): """ Receive a payload from the SessionManager and forward it to the corresponding service or application. @@ -170,7 +176,7 @@ class SoftwareManager: """ receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None) if receiver: - receiver.receive(payload=payload, session_id=session_id, from_nic=from_nic) + receiver.receive(payload=payload, session_id=session_id, from_nic=from_nic, frame=frame) else: self.sys_log.error(f"No service or application found for port {port} and protocol {protocol}") pass @@ -181,7 +187,7 @@ class SoftwareManager: :param markdown: If True, outputs the table in markdown format. Default is False. """ - table = PrettyTable(["Name", "Type", "Operating State", "Health State", "Port"]) + table = PrettyTable(["Name", "Type", "Operating State", "Health State", "Port", "Protocol"]) if markdown: table.set_style(MARKDOWN) table.align = "l" @@ -194,7 +200,8 @@ class SoftwareManager: software_type, software.operating_state.name, software.health_state_actual.name, - software.port.value, + software.port.value if software.port != Port.NONE else None, + software.protocol.value ] ) print(table) diff --git a/src/primaite/simulator/system/core/sys_log.py b/src/primaite/simulator/system/core/sys_log.py index 00e6920b..414bacef 100644 --- a/src/primaite/simulator/system/core/sys_log.py +++ b/src/primaite/simulator/system/core/sys_log.py @@ -88,47 +88,62 @@ class SysLog: root.mkdir(exist_ok=True, parents=True) return root / f"{self.hostname}_sys.log" - def debug(self, msg: str): + def debug(self, msg: str, to_terminal: bool = False): """ Logs a message with the DEBUG level. :param msg: The message to be logged. + :param to_terminal: If True, prints to the terminal too. """ if SIM_OUTPUT.save_sys_logs: self.logger.debug(msg) + if to_terminal: + print(msg) - def info(self, msg: str): + def info(self, msg: str, to_terminal: bool = False): """ Logs a message with the INFO level. :param msg: The message to be logged. + :param to_terminal: If True, prints to the terminal too. """ if SIM_OUTPUT.save_sys_logs: self.logger.info(msg) + if to_terminal: + print(msg) - def warning(self, msg: str): + def warning(self, msg: str, to_terminal: bool = False): """ Logs a message with the WARNING level. :param msg: The message to be logged. + :param to_terminal: If True, prints to the terminal too. """ if SIM_OUTPUT.save_sys_logs: self.logger.warning(msg) + if to_terminal: + print(msg) - def error(self, msg: str): + def error(self, msg: str, to_terminal: bool = False): """ Logs a message with the ERROR level. :param msg: The message to be logged. + :param to_terminal: If True, prints to the terminal too. """ if SIM_OUTPUT.save_sys_logs: self.logger.error(msg) + if to_terminal: + print(msg) - def critical(self, msg: str): + def critical(self, msg: str, to_terminal: bool = False): """ Logs a message with the CRITICAL level. :param msg: The message to be logged. + :param to_terminal: If True, prints to the terminal too. """ if SIM_OUTPUT.save_sys_logs: self.logger.critical(msg) + if to_terminal: + print(msg) diff --git a/src/primaite/simulator/system/services/arp/arp.py b/src/primaite/simulator/system/services/arp/arp.py index 136718c2..28a2485c 100644 --- a/src/primaite/simulator/system/services/arp/arp.py +++ b/src/primaite/simulator/system/services/arp/arp.py @@ -2,17 +2,15 @@ from __future__ import annotations from abc import abstractmethod from ipaddress import IPv4Address -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Optional, Union from prettytable import MARKDOWN, PrettyTable -from pydantic import BaseModel from primaite.simulator.network.hardware.base import NIC from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket -from primaite.simulator.network.protocols.packet import DataPacket from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol -from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader +from primaite.simulator.network.transmission.transport_layer import Port, UDPHeader from primaite.simulator.system.services.service import Service @@ -191,7 +189,6 @@ class ARP(Service): from_nic = kwargs.get("from_nic") if payload.request: - print(from_nic) self._process_arp_request(arp_packet=payload, from_nic=from_nic) else: self._process_arp_reply(arp_packet=payload, from_nic=from_nic) diff --git a/src/primaite/simulator/system/services/icmp/__init__.py b/src/primaite/simulator/system/services/icmp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/primaite/simulator/system/services/icmp/icmp.py b/src/primaite/simulator/system/services/icmp/icmp.py new file mode 100644 index 00000000..16dd4f8c --- /dev/null +++ b/src/primaite/simulator/system/services/icmp/icmp.py @@ -0,0 +1,159 @@ +import secrets +from ipaddress import IPv4Address +from typing import Dict, Any, Union, Optional, Tuple + +from primaite import getLogger +from primaite.simulator.network.hardware.base import NIC +from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType +from primaite.simulator.network.transmission.data_link_layer import Frame, EthernetHeader +from primaite.simulator.network.transmission.network_layer import IPProtocol, IPPacket +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.service import Service + +_LOGGER = getLogger(__name__) + + +class ICMP(Service): + request_replies: Dict = {} + + def __init__(self, **kwargs): + kwargs["name"] = "ICMP" + kwargs["port"] = Port.NONE + kwargs["protocol"] = IPProtocol.ICMP + super().__init__(**kwargs) + + def describe_state(self) -> Dict: + pass + + def clear(self): + """Clears the ICMP request replies tracker.""" + self.request_replies.clear() + + def _send_icmp_echo_request( + self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None, pings: int = 4 + ) -> Tuple[int, Union[int, None]]: + """ + Send an ICMP echo request (ping) to a target IP address and manage the sequence and identifier. + + :param target_ip_address: The target IP address to send the ping. + :param sequence: The sequence number of the echo request. Defaults to 0. + :param identifier: An optional identifier for the ICMP packet. If None, a default will be used. + :return: A tuple containing the next sequence number and the identifier, or (0, None) if the target IP address + was not found in the ARP cache. + """ + nic = self.software_manager.arp.get_arp_cache_nic(target_ip_address) + + if not nic: + return pings, None + + # ARP entry exists + sequence += 1 + target_mac_address = self.software_manager.arp.get_arp_cache_mac_address(target_ip_address) + + src_nic = self.software_manager.arp.get_arp_cache_nic(target_ip_address) + + # Network Layer + ip_packet = IPPacket( + src_ip_address=nic.ip_address, + dst_ip_address=target_ip_address, + protocol=IPProtocol.ICMP, + ) + # Data Link Layer + ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) + icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence) + payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size + frame = Frame(ethernet=ethernet_header, ip=ip_packet, icmp=icmp_packet, payload=payload) + nic.send_frame(frame) + return sequence, icmp_packet.identifier + + def ping(self, target_ip_address: Union[IPv4Address, str], pings: int = 4) -> bool: + """ + Ping an IP address, performing a standard ICMP echo request/response. + + :param target_ip_address: The target IP address to ping. + :param pings: The number of pings to attempt, default is 4. + :return: True if the ping is successful, otherwise False. + """ + if not self._can_perform_action(): + return False + if target_ip_address.is_loopback: + self.sys_log.info("Pinging loopback address") + return any(nic.enabled for nic in self.nics.values()) + self.sys_log.info(f"Pinging {target_ip_address}:", to_terminal=True) + sequence, identifier = 0, None + while sequence < pings: + sequence, identifier = self._send_icmp_echo_request( + target_ip_address, sequence, identifier, pings + ) + request_replies = self.software_manager.icmp.request_replies.get(identifier) + passed = request_replies == pings + if request_replies: + self.software_manager.icmp.request_replies.pop(identifier) + else: + request_replies = 0 + output = ( + f"Ping statistics for {target_ip_address}: " + f"Packets: Sent = {pings}, " + f"Received = {request_replies}, " + f"Lost = {pings - request_replies} ({(pings - request_replies) / pings * 100}% loss)" + ) + self.sys_log.info(output, to_terminal=True) + + return passed + + def _process_icmp_echo_request(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False): + if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: + if not is_reattempt: + self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}") + target_mac_address = self.software_manager.arp.get_arp_cache_mac_address(frame.ip.src_ip_address) + + src_nic = self.software_manager.arp.get_arp_cache_nic(frame.ip.src_ip_address) + if not src_nic: + self.software_manager.arp.send_arp_request(frame.ip.src_ip_address) + self.process_icmp(frame=frame, from_nic=from_nic, is_reattempt=True) + return + + # Network Layer + ip_packet = IPPacket( + src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP + ) + # Data Link Layer + ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) + icmp_reply_packet = ICMPPacket( + icmp_type=ICMPType.ECHO_REPLY, + icmp_code=0, + identifier=frame.icmp.identifier, + sequence=frame.icmp.sequence + 1, + ) + payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size + frame = Frame(ethernet=ethernet_header, ip=ip_packet, icmp=icmp_reply_packet, payload=payload) + self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}") + + src_nic.send_frame(frame) + + def _process_icmp_echo_reply(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False): + time = frame.transmission_duration() + time_str = f"{time}ms" if time > 0 else "<1ms" + self.sys_log.info( + f"Reply from {frame.ip.src_ip_address}: " + f"bytes={len(frame.payload)}, " + f"time={time_str}, " + f"TTL={frame.ip.ttl}", + to_terminal=True + ) + if not self.request_replies.get(frame.icmp.identifier): + self.request_replies[frame.icmp.identifier] = 0 + self.request_replies[frame.icmp.identifier] += 1 + + def receive(self, payload: Any, session_id: str, **kwargs) -> bool: + frame: Frame = kwargs["frame"] + from_nic = kwargs["from_nic"] + + if not frame.icmp: + return False + + if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: + self._process_icmp_echo_request(frame, from_nic) + elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY: + self._process_icmp_echo_reply(frame, from_nic) + return True diff --git a/src/primaite/simulator/system/services/icmp/router_icmp.py b/src/primaite/simulator/system/services/icmp/router_icmp.py new file mode 100644 index 00000000..1def00c4 --- /dev/null +++ b/src/primaite/simulator/system/services/icmp/router_icmp.py @@ -0,0 +1,90 @@ +# class RouterICMP(ICMP): +# """ +# A class to represent a router's Internet Control Message Protocol (ICMP) handler. +# +# :param sys_log: System log for logging network events and errors. +# :type sys_log: SysLog +# :param arp_cache: The ARP cache for resolving MAC addresses. +# :type arp_cache: ARPCache +# :param router: The router to which this ICMP handler belongs. +# :type router: Router +# """ +# +# router: Router +# +# def __init__(self, sys_log: SysLog, arp_cache: ARPCache, router: Router): +# super().__init__(sys_log, arp_cache) +# self.router = router +# +# def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False): +# """ +# Process incoming ICMP frames based on ICMP type. +# +# :param frame: The incoming frame to process. +# :param from_nic: The network interface where the frame is coming from. +# :param is_reattempt: Flag to indicate if the process is a reattempt. +# """ +# if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: +# # determine if request is for router interface or whether it needs to be routed +# +# for nic in self.router.nics.values(): +# if nic.ip_address == frame.ip.dst_ip_address: +# if nic.enabled: +# # reply to the request +# if not is_reattempt: +# self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}") +# target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address) +# src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address) +# tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) +# +# # Network Layer +# ip_packet = IPPacket( +# src_ip_address=nic.ip_address, +# dst_ip_address=frame.ip.src_ip_address, +# protocol=IPProtocol.ICMP, +# ) +# # Data Link Layer +# ethernet_header = EthernetHeader( +# src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address +# ) +# icmp_reply_packet = ICMPPacket( +# icmp_type=ICMPType.ECHO_REPLY, +# icmp_code=0, +# identifier=frame.icmp.identifier, +# sequence=frame.icmp.sequence + 1, +# ) +# payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size +# frame = Frame( +# ethernet=ethernet_header, +# ip=ip_packet, +# tcp=tcp_header, +# icmp=icmp_reply_packet, +# payload=payload, +# ) +# self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}") +# +# src_nic.send_frame(frame) +# return +# +# # Route the frame +# self.router.process_frame(frame, from_nic) +# +# elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY: +# for nic in self.router.nics.values(): +# if nic.ip_address == frame.ip.dst_ip_address: +# if nic.enabled: +# time = frame.transmission_duration() +# time_str = f"{time}ms" if time > 0 else "<1ms" +# self.sys_log.info( +# f"Reply from {frame.ip.src_ip_address}: " +# f"bytes={len(frame.payload)}, " +# f"time={time_str}, " +# f"TTL={frame.ip.ttl}" +# ) +# if not self.request_replies.get(frame.icmp.identifier): +# self.request_replies[frame.icmp.identifier] = 0 +# self.request_replies[frame.icmp.identifier] += 1 +# +# return +# # Route the frame +# self.router.process_frame(frame, from_nic)