#2248 - ICMP now working as a service using the session manager for transmission. Now started to comb through the tests to fix anything up.

This commit is contained in:
Chris McCarthy
2024-02-02 16:20:15 +00:00
parent 87d9d6da04
commit dc5aeede33
12 changed files with 241 additions and 491 deletions

View File

@@ -599,262 +599,6 @@ class Link(SimComponent):
def __str__(self) -> str:
return f"{self.endpoint_a}<-->{self.endpoint_b}"
class ARPCache:
"""
The ARPCache (Address Resolution Protocol) class.
Responsible for maintaining a mapping between IP addresses and MAC addresses (ARP cache) for the network. It
provides methods for looking up, adding, and removing entries, and for processing ARPPackets.
"""
def __init__(self, sys_log: "SysLog"):
"""
Initialize an ARP (Address Resolution Protocol) cache.
:param sys_log: The nodes sys log.
"""
self.sys_log: "SysLog" = sys_log
self.arp: Dict[IPv4Address, ARPEntry] = {}
self.nics: Dict[str, "NIC"] = {}
self.node = None
def show(self, markdown: bool = False):
"""Prints a table of ARC Cache."""
table = PrettyTable(["IP Address", "MAC Address", "Via"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} ARP Cache"
for ip, arp in self.arp.items():
table.add_row(
[
str(ip),
arp.mac_address,
self.nics[arp.nic_uuid].mac_address,
]
)
print(table)
def clear(self):
"""Clears the arp cache."""
self.arp.clear()
def add_arp_cache_entry(self, ip_address: IPv4Address, mac_address: str, nic: NIC, override: bool = False):
"""
Add an ARP entry to the cache.
If an entry for the given IP address already exists, the entry is only updated if the `override` parameter is
set to True.
:param ip_address: The IP address to be added to the cache.
:param mac_address: The MAC address associated with the IP address.
:param nic: The NIC through which the NIC with the IP address is reachable.
:param override: If True, an existing entry for the IP address will be overridden. Default is False.
"""
for _nic in self.nics.values():
if _nic.ip_address == ip_address:
return
if override or not self.arp.get(ip_address):
self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}")
arp_entry = ARPEntry(mac_address=mac_address, nic_uuid=nic.uuid)
self.arp[ip_address] = arp_entry
def _remove_arp_cache_entry(self, ip_address: IPv4Address):
"""
Remove an ARP entry from the cache.
:param ip_address: The IP address to be removed from the cache.
"""
if ip_address in self.arp:
del self.arp[ip_address]
def get_default_gateway_mac_address(self) -> Optional[str]:
if self.arp.node.default_gateway:
return self.get_arp_cache_mac_address(self.arp.node.default_gateway)
def get_default_gateway_nic(self) -> Optional[NIC]:
if self.arp.node.default_gateway:
return self.get_arp_cache_nic(self.arp.node.default_gateway)
def _get_arp_cache_mac_address(
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
) -> Optional[str]:
arp_entry = self.arp.get(ip_address)
if arp_entry:
return arp_entry.mac_address
else:
if not is_reattempt:
self.send_arp_request(ip_address)
return self._get_arp_cache_mac_address(
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
return self._get_arp_cache_mac_address(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
"""
Get the MAC address associated with an IP address.
:param ip_address: The IP address to look up in the cache.
:return: The MAC address associated with the IP address, or None if not found.
"""
return self._get_arp_cache_mac_address(ip_address)
def _get_arp_cache_nic(
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
) -> Optional[NIC]:
arp_entry = self.arp.get(ip_address)
if arp_entry:
return self.nics[arp_entry.nic_uuid]
else:
if not is_reattempt:
self.send_arp_request(ip_address)
return self._get_arp_cache_nic(
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
return self._get_arp_cache_nic(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
def get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
"""
Get the NIC associated with an IP address.
:param ip_address: The IP address to look up in the cache.
:return: The NIC associated with the IP address, or None if not found.
"""
return self._get_arp_cache_nic(ip_address)
def clear_arp_cache(self):
"""Clear the entire ARP cache, removing all stored entries."""
self.arp.clear()
def send_arp_request(
self, target_ip_address: Union[IPv4Address, str], ignore_networks: Optional[List[IPv4Address]] = None
):
"""
Perform a standard ARP request for a given target IP address.
Broadcasts the request through all enabled NICs to determine the MAC address corresponding to the target IP
address. This method can be configured to ignore specific networks when sending out ARP requests,
which is useful in environments where certain addresses should not be queried.
:param target_ip_address: The target IP address to send an ARP request for.
:param ignore_networks: An optional list of IPv4 addresses representing networks to be excluded from the ARP
request broadcast. Each address in this list indicates a network which will not be queried during the ARP
request process. This is particularly useful in complex network environments where traffic should be
minimized or controlled to specific subnets. It is mainly used by the router to prevent ARP requests being
sent back to their source.
"""
pass
# for nic in self.nics.values():
# use_nic = True
# if ignore_networks:
# for ipv4 in ignore_networks:
# if ipv4 in nic.ip_network:
# use_nic = False
# if nic.enabled and use_nic:
# self.sys_log.info(f"Sending ARP request from NIC {nic} for ip {target_ip_address}")
# udp_header = UDPHeader(src_port=Port.ARP, dst_port=Port.ARP)
#
# # Network Layer
# ip_packet = IPPacket(
# src_ip_address=nic.ip_address, dst_ip_address=target_ip_address, protocol=IPProtocol.UDP
# )
# # Data Link Layer
# ethernet_header = EthernetHeader(src_mac_addr=nic.mac_address, dst_mac_addr="ff:ff:ff:ff:ff:ff")
# arp_packet = ARPPacket(
# sender_ip_address=nic.ip_address,
# sender_mac_addr=nic.mac_address,
# target_ip_address=target_ip_address,
# )
# frame = Frame(ethernet=ethernet_header, ip=ip_packet, udp=udp_header, payload=arp_packet)
# nic.send_frame(frame)
def send_arp_reply(self, arp_reply: ARPPacket, from_nic: NIC):
"""
Send an ARP reply back through the NIC it came from.
:param arp_reply: The ARP reply to send.
:param from_nic: The NIC to send the ARP reply from.
"""
self.sys_log.info(
f"Sending ARP reply from {arp_reply.sender_mac_addr}/{arp_reply.sender_ip_address} "
f"to {arp_reply.target_ip_address}/{arp_reply.target_mac_addr} "
)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
ip_packet = IPPacket(
src_ip_address=arp_reply.sender_ip_address,
dst_ip_address=arp_reply.target_ip_address,
)
ethernet_header = EthernetHeader(src_mac_addr=arp_reply.sender_mac_addr, dst_mac_addr=arp_reply.target_mac_addr)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_reply)
from_nic.send_frame(frame)
def process_arp_packet(self, from_nic: NIC, arp_packet: ARPPacket):
"""
Process a received ARP packet, handling both ARP requests and responses.
If an ARP request is received for the local IP, a response is sent back.
If an ARP response is received, the ARP cache is updated with the new entry.
:param from_nic: The NIC that received the ARP packet.
:param arp_packet: The ARP packet to be processed.
"""
# ARP Reply
if not arp_packet.request:
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
return
# ARP Request
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Unmatched ARP Request
if arp_packet.target_ip_address != from_nic.ip_address:
self.sys_log.info(
f"Ignoring ARP request for {arp_packet.target_ip_address}. Current IP address is {from_nic.ip_address}"
)
return
# Matched ARP request
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
def __contains__(self, item: Any) -> bool:
return item in self.arp
class Node(SimComponent):
"""
A basic Node class that represents a node on the network.

View File

@@ -1,10 +1,11 @@
from primaite.simulator.network.hardware.base import NIC, Node
from primaite.simulator.network.hardware.nodes.host import Host
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
class Computer(Node):
class Computer(Host):
"""
A basic Computer class.
@@ -20,36 +21,16 @@ class Computer(Node):
Instances of computer come 'pre-packaged' with the following:
* Core Functionality:
* ARP
* ICMP
* Packet Capture
* Sys Log
* Services:
* ARP Service
* ICMP Service
* DNS Client
* FTP Client
* LDAP Client
* NTP Client
* Applications:
* Email Client
* Web Browser
* Processes:
* Placeholder
"""
pass
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=kwargs["ip_address"], subnet_mask=kwargs["subnet_mask"]))
self._install_system_software()
def _install_system_software(self):
"""Install System Software - software that is usually provided with the OS."""
# DNS Client
self.software_manager.install(DNSClient)
# FTP
self.software_manager.install(FTPClient)
# Web Browser
self.software_manager.install(WebBrowser)
super()._install_system_software()

View File

@@ -23,20 +23,16 @@ class Host(Node):
Instances of computer come 'pre-packaged' with the following:
* Core Functionality:
* ARP
* ICMP
* Packet Capture
* Sys Log
* Services:
* ARP Service
* ICMP Service
* DNS Client
* FTP Client
* LDAP Client
* NTP Client
* Applications:
* Email Client
* Web Browser
* Processes:
* Placeholder
"""
def __init__(self, **kwargs):

View File

@@ -8,7 +8,7 @@ from typing import Dict, List, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.network.hardware.base import ARPCache, NIC, Node
from primaite.simulator.network.hardware.base import NIC, Node
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
@@ -528,108 +528,6 @@ class RouteTable(SimComponent):
table.add_row([index, f"{route.address}/{network.prefixlen}", route.next_hop_ip_address, route.metric])
print(table)
class RouterARPCache(ARPCache):
"""
Inherits from ARPCache and adds router-specific ARP packet processing.
:ivar SysLog sys_log: A system log for logging messages.
:ivar Router router: The router to which this ARP cache belongs.
"""
def __init__(self, sys_log: SysLog, router: Router):
super().__init__(sys_log)
self.router: Router = router
def process_arp_packet(
self, from_nic: NIC, frame: Frame, route_table: RouteTable, is_reattempt: bool = False
) -> None:
"""
Processes a received ARP (Address Resolution Protocol) packet in a router-specific way.
This method is responsible for handling both ARP requests and responses. It processes ARP packets received on a
Network Interface Card (NIC) and performs actions based on whether the packet is a request or a reply. This
includes updating the ARP cache, forwarding ARP replies, sending ARP requests for unknown destinations, and
handling packet TTL (Time To Live).
The method first checks if the ARP packet is a request or a reply. For ARP replies, it updates the ARP cache
and forwards the reply if necessary. For ARP requests, it checks if the target IP matches one of the router's
NICs and sends an ARP reply if so. If the destination is not directly connected, it consults the routing table
to find the best route and reattempts ARP request processing if needed.
:param from_nic: The NIC that received the ARP packet.
:param frame: The frame containing the ARP packet.
:param route_table: The routing table of the router.
:param is_reattempt: Flag to indicate if this is a reattempt of processing the ARP packet, defaults to False.
"""
arp_packet = frame.arp
# ARP Reply
if not arp_packet.request:
if arp_packet.target_ip_address == from_nic.ip_address:
# reply to the Router specifically
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address,
mac_address=arp_packet.sender_mac_addr,
nic=from_nic,
)
return
# # Reply for a connected requested
# nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
# if nic:
# self.sys_log.info(
# f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
# )
# arp_packet.sender_mac_addr = nic.mac_address
# frame.decrement_ttl()
# if frame.ip and frame.ip.ttl < 1:
# self.sys_log.info("Frame discarded as TTL limit reached")
# return
# nic.send_frame(frame)
# return
# ARP Request
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Matched ARP request
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
# If the target IP matches one of the router's NICs
for nic in self.nics.values():
if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
arp_reply = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_reply, from_nic)
return
# # Check Route Table
# route = route_table.find_best_route(arp_packet.target_ip_address)
# if route and route != self.router.route_table.default_route:
# nic = self.get_arp_cache_nic(route.next_hop_ip_address)
#
# if not nic:
# if not is_reattempt:
# self.send_arp_request(route.next_hop_ip_address, ignore_networks=[frame.ip.src_ip_address])
# return self.process_arp_packet(from_nic, frame, route_table, is_reattempt=True)
# else:
# self.sys_log.info("Ignoring ARP request as destination unavailable/No ARP entry found")
# return
# else:
# arp_reply = arp_packet.generate_reply(from_nic.mac_address)
# self.send_arp_reply(arp_reply, from_nic)
# return
class RouterNIC(NIC):
"""
A Router-specific Network Interface Card (NIC) that extends the standard NIC functionality.
@@ -684,8 +582,8 @@ class Router(Node):
ethernet_ports: Dict[int, RouterNIC] = {}
acl: AccessControlList
route_table: RouteTable
arp: RouterARPCache
icmp: RouterICMP
# arp: RouterARPCache
# icmp: RouterICMP
def __init__(self, hostname: str, num_ports: int = 5, **kwargs):
if not kwargs.get("sys_log"):
@@ -694,12 +592,13 @@ class Router(Node):
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY)
if not kwargs.get("route_table"):
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
if not kwargs.get("arp"):
kwargs["arp"] = RouterARPCache(sys_log=kwargs.get("sys_log"), router=self)
# if not kwargs.get("arp"):
# kwargs["arp"] = RouterARPCache(sys_log=kwargs.get("sys_log"), router=self)
# if not kwargs.get("icmp"):
# kwargs["icmp"] = RouterICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"), router=self)
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
# TODO: Install RoputerICMP
# TODO: Install RouterICMP
# TODO: Install RouterARP
for i in range(1, self.num_ports + 1):
nic = RouterNIC(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0")
self.connect_nic(nic)

View File

@@ -1,7 +1,7 @@
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.host import Host
class Server(Computer):
class Server(Host):
"""
A basic Server class.
@@ -17,18 +17,15 @@ class Server(Computer):
Instances of Server come 'pre-packaged' with the following:
* Core Functionality:
* ARP
* ICMP
* Packet Capture
* Sys Log
* Services:
* ARP Service
* ICMP Service
* DNS Client
* FTP Client
* LDAP Client
* NTP Client
* Applications:
* Email Client
* Web Browser
* Processes:
* Placeholder
"""
pass

View File

@@ -7,6 +7,7 @@ from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import SimComponent
from primaite.simulator.network.protocols.arp import ARPPacket
from primaite.simulator.network.protocols.icmp import ICMPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader
@@ -200,6 +201,7 @@ class SessionManager:
dst_port: Optional[Port] = None,
session_id: Optional[str] = None,
ip_protocol: IPProtocol = IPProtocol.TCP,
icmp_packet: Optional[ICMPPacket] = None
) -> Union[Any, None]:
"""
Receive a payload from the SoftwareManager and send it to the appropriate NIC for transmission.
@@ -250,16 +252,17 @@ class SessionManager:
)
# Construct the frame for transmission
frame = Frame(
ethernet=EthernetHeader(src_mac_addr=outbound_nic.mac_address, dst_mac_addr=dst_mac_address),
ip=IPPacket(src_ip_address=outbound_nic.ip_address, dst_ip_address=dst_ip_address, protocol=ip_protocol),
tcp=tcp_header,
udp=udp_header,
icmp=icmp_packet,
payload=payload,
)
# Manage session for unicast transmission
# TODO: Only create sessions for TCP
if not (is_broadcast and session_id):
session_key = self._get_session_key(frame, inbound_frame=False)
session = self.sessions_by_key.get(session_key)
@@ -281,6 +284,7 @@ class SessionManager:
:param frame: The frame being received.
"""
# TODO: Only create sessions for TCP
session_key = self._get_session_key(frame, inbound_frame=True)
session: Session = self.sessions_by_key.get(session_key)
if not session:

View File

@@ -30,11 +30,11 @@ class HostARP(ARP):
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if self.software_manager.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
self.send_arp_request(self.software_manager.node.default_gateway)
return self._get_arp_cache_mac_address(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
ip_address=self.software_manager.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
@@ -61,11 +61,11 @@ class HostARP(ARP):
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if self.software_manager.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
self.send_arp_request(self.software_manager.node.default_gateway)
return self._get_arp_cache_nic(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
ip_address=self.software_manager.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None

View File

@@ -0,0 +1,98 @@
# class RouterARPCache(ARPCache):
# """
# Inherits from ARPCache and adds router-specific ARP packet processing.
#
# :ivar SysLog sys_log: A system log for logging messages.
# :ivar Router router: The router to which this ARP cache belongs.
# """
#
# def __init__(self, sys_log: SysLog, router: Router):
# super().__init__(sys_log)
# self.router: Router = router
#
# def process_arp_packet(
# self, from_nic: NIC, frame: Frame, route_table: RouteTable, is_reattempt: bool = False
# ) -> None:
# """
# Processes a received ARP (Address Resolution Protocol) packet in a router-specific way.
#
# This method is responsible for handling both ARP requests and responses. It processes ARP packets received on a
# Network Interface Card (NIC) and performs actions based on whether the packet is a request or a reply. This
# includes updating the ARP cache, forwarding ARP replies, sending ARP requests for unknown destinations, and
# handling packet TTL (Time To Live).
#
# The method first checks if the ARP packet is a request or a reply. For ARP replies, it updates the ARP cache
# and forwards the reply if necessary. For ARP requests, it checks if the target IP matches one of the router's
# NICs and sends an ARP reply if so. If the destination is not directly connected, it consults the routing table
# to find the best route and reattempts ARP request processing if needed.
#
# :param from_nic: The NIC that received the ARP packet.
# :param frame: The frame containing the ARP packet.
# :param route_table: The routing table of the router.
# :param is_reattempt: Flag to indicate if this is a reattempt of processing the ARP packet, defaults to False.
# """
# arp_packet = frame.arp
#
# # ARP Reply
# if not arp_packet.request:
# if arp_packet.target_ip_address == from_nic.ip_address:
# # reply to the Router specifically
# self.sys_log.info(
# f"Received ARP response for {arp_packet.sender_ip_address} "
# f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
# )
# self.add_arp_cache_entry(
# ip_address=arp_packet.sender_ip_address,
# mac_address=arp_packet.sender_mac_addr,
# nic=from_nic,
# )
# return
#
# # # Reply for a connected requested
# # nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
# # if nic:
# # self.sys_log.info(
# # f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
# # )
# # arp_packet.sender_mac_addr = nic.mac_address
# # frame.decrement_ttl()
# # if frame.ip and frame.ip.ttl < 1:
# # self.sys_log.info("Frame discarded as TTL limit reached")
# # return
# # nic.send_frame(frame)
# # return
#
# # ARP Request
# self.sys_log.info(
# f"Received ARP request for {arp_packet.target_ip_address} from "
# f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
# )
# # Matched ARP request
# self.add_arp_cache_entry(
# ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
# )
#
# # If the target IP matches one of the router's NICs
# for nic in self.nics.values():
# if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
# arp_reply = arp_packet.generate_reply(from_nic.mac_address)
# self.send_arp_reply(arp_reply, from_nic)
# return
#
# # # Check Route Table
# # route = route_table.find_best_route(arp_packet.target_ip_address)
# # if route and route != self.router.route_table.default_route:
# # nic = self.get_arp_cache_nic(route.next_hop_ip_address)
# #
# # if not nic:
# # if not is_reattempt:
# # self.send_arp_request(route.next_hop_ip_address, ignore_networks=[frame.ip.src_ip_address])
# # return self.process_arp_packet(from_nic, frame, route_table, is_reattempt=True)
# # else:
# # self.sys_log.info("Ignoring ARP request as destination unavailable/No ARP entry found")
# # return
# # else:
# # arp_reply = arp_packet.generate_reply(from_nic.mac_address)
# # self.send_arp_reply(arp_reply, from_nic)
# # return
#

View File

@@ -5,8 +5,8 @@ from typing import Dict, Any, Union, Optional, Tuple
from primaite import getLogger
from primaite.simulator.network.hardware.base import NIC
from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
from primaite.simulator.network.transmission.data_link_layer import Frame, EthernetHeader
from primaite.simulator.network.transmission.network_layer import IPProtocol, IPPacket
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
@@ -14,6 +14,12 @@ _LOGGER = getLogger(__name__)
class ICMP(Service):
"""
The Internet Control Message Protocol (ICMP) services.
Enables the sending and receiving of ICMP messages such as echo requests and replies. This is typically used for
network diagnostics, notably the ping command.
"""
request_replies: Dict = {}
def __init__(self, **kwargs):
@@ -26,53 +32,22 @@ class ICMP(Service):
pass
def clear(self):
"""Clears the ICMP request replies tracker."""
"""
Clears the ICMP request and reply tracker.
This is typically used to reset the state of the service, removing all tracked ICMP echo requests and their
corresponding replies.
"""
self.request_replies.clear()
def _send_icmp_echo_request(
self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None, pings: int = 4
) -> Tuple[int, Union[int, None]]:
"""
Send an ICMP echo request (ping) to a target IP address and manage the sequence and identifier.
:param target_ip_address: The target IP address to send the ping.
:param sequence: The sequence number of the echo request. Defaults to 0.
:param identifier: An optional identifier for the ICMP packet. If None, a default will be used.
:return: A tuple containing the next sequence number and the identifier, or (0, None) if the target IP address
was not found in the ARP cache.
"""
nic = self.software_manager.arp.get_arp_cache_nic(target_ip_address)
if not nic:
return pings, None
# ARP entry exists
sequence += 1
target_mac_address = self.software_manager.arp.get_arp_cache_mac_address(target_ip_address)
src_nic = self.software_manager.arp.get_arp_cache_nic(target_ip_address)
# Network Layer
ip_packet = IPPacket(
src_ip_address=nic.ip_address,
dst_ip_address=target_ip_address,
protocol=IPProtocol.ICMP,
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence)
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(ethernet=ethernet_header, ip=ip_packet, icmp=icmp_packet, payload=payload)
nic.send_frame(frame)
return sequence, icmp_packet.identifier
def ping(self, target_ip_address: Union[IPv4Address, str], pings: int = 4) -> bool:
"""
Ping an IP address, performing a standard ICMP echo request/response.
Pings a target IP address by sending an ICMP echo request and waiting for a reply.
:param target_ip_address: The target IP address to ping.
:param pings: The number of pings to attempt, default is 4.
:return: True if the ping is successful, otherwise False.
:param target_ip_address: The IP address to be pinged.
:param pings: The number of echo requests to send. Defaults to 4.
:return: True if the ping was successful (i.e., if a reply was received for every request sent), otherwise
False.
"""
if not self._can_perform_action():
return False
@@ -101,37 +76,79 @@ class ICMP(Service):
return passed
def _process_icmp_echo_request(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False):
if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
if not is_reattempt:
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
target_mac_address = self.software_manager.arp.get_arp_cache_mac_address(frame.ip.src_ip_address)
def _send_icmp_echo_request(
self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None, pings: int = 4
) -> Tuple[int, Union[int, None]]:
"""
Sends an ICMP echo request to a specified target IP address.
src_nic = self.software_manager.arp.get_arp_cache_nic(frame.ip.src_ip_address)
if not src_nic:
self.software_manager.arp.send_arp_request(frame.ip.src_ip_address)
self.process_icmp(frame=frame, from_nic=from_nic, is_reattempt=True)
return
:param target_ip_address: The target IP address for the echo request.
:param sequence: The sequence number of the echo request.
:param identifier: The identifier for the ICMP packet. If None, a default identifier is used.
:param pings: The number of pings to send. Defaults to 4.
:return: A tuple containing the next sequence number and the identifier.
"""
nic = self.software_manager.session_manager.resolve_outbound_nic(target_ip_address)
# Network Layer
ip_packet = IPPacket(
src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP
if not nic:
self.sys_log.error(
"Cannot send ICMP echo request as there is no outbound NIC to use. Try configuring the default gateway."
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_reply_packet = ICMPPacket(
icmp_type=ICMPType.ECHO_REPLY,
icmp_code=0,
identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1,
return pings, None
sequence += 1
icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence)
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
self.software_manager.session_manager.receive_payload_from_software_manager(
payload=payload,
dst_ip_address=target_ip_address,
dst_port=self.port,
ip_protocol=self.protocol,
icmp_packet=icmp_packet
)
return sequence, icmp_packet.identifier
def _process_icmp_echo_request(self, frame: Frame):
"""
Processes an ICMP echo request received by the service.
:param frame: The network frame containing the ICMP echo request.
"""
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
nic = self.software_manager.session_manager.resolve_outbound_nic(frame.ip.src_ip_address)
if not nic:
self.sys_log.error(
"Cannot send ICMP echo reply as there is no outbound NIC to use. Try configuring the default gateway."
)
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(ethernet=ethernet_header, ip=ip_packet, icmp=icmp_reply_packet, payload=payload)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}")
return
src_nic.send_frame(frame)
icmp_packet = ICMPPacket(
icmp_type=ICMPType.ECHO_REPLY,
icmp_code=0,
identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1,
)
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}")
def _process_icmp_echo_reply(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False):
self.software_manager.session_manager.receive_payload_from_software_manager(
payload=payload,
dst_ip_address=frame.ip.src_ip_address,
dst_port=self.port,
ip_protocol=self.protocol,
icmp_packet=icmp_packet
)
def _process_icmp_echo_reply(self, frame: Frame):
"""
Processes an ICMP echo reply received by the service, logging the reply details.
:param frame: The network frame containing the ICMP echo reply.
"""
time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
@@ -146,14 +163,21 @@ class ICMP(Service):
self.request_replies[frame.icmp.identifier] += 1
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
Processes received data, handling ICMP echo requests and replies.
:param payload: The payload received.
:param session_id: The session ID associated with the received data.
:param kwargs: Additional keyword arguments.
:return: True if the payload was processed successfully, otherwise False.
"""
frame: Frame = kwargs["frame"]
from_nic = kwargs["from_nic"]
if not frame.icmp:
return False
if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
self._process_icmp_echo_request(frame, from_nic)
self._process_icmp_echo_request(frame)
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
self._process_icmp_echo_reply(frame, from_nic)
self._process_icmp_echo_reply(frame)
return True

View File

@@ -1,3 +1,4 @@
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import Link, NodeOperatingState
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.server import Server
@@ -6,25 +7,30 @@ from primaite.simulator.network.hardware.nodes.switch import Switch
def test_switched_network():
"""Tests a node can ping another node via the switch."""
network = Network()
client_1 = Computer(
hostname="client_1",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.0",
operating_state=NodeOperatingState.ON,
default_gateway="192.168.1.1",
start_up_duration=0,
)
client_1.power_on()
server_1 = Server(
hostname=" server_1",
hostname="server_1",
ip_address="192.168.1.11",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.11",
operating_state=NodeOperatingState.ON,
default_gateway="192.168.1.1",
start_up_duration=0,
)
server_1.power_on()
switch_1 = Switch(hostname="switch_1", num_ports=6, operating_state=NodeOperatingState.ON)
switch_1 = Switch(hostname="switch_1", start_up_duration=0)
switch_1.power_on()
Link(endpoint_a=client_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[1])
Link(endpoint_a=server_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[2])
network.connect(endpoint_a=client_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[1])
network.connect(endpoint_a=server_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[2])
assert client_1.ping("192.168.1.11")

View File

@@ -1,7 +1,8 @@
import pytest
from primaite.simulator.network.protocols.icmp import ICMPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import ICMPPacket, IPPacket, IPProtocol, Precedence
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol, Precedence
from primaite.simulator.network.transmission.primaite_layer import AgentSource, DataStatus
from primaite.simulator.network.transmission.transport_layer import Port, TCPFlags, TCPHeader, UDPHeader

View File

@@ -1,6 +1,6 @@
import pytest
from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType
from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
def test_icmp_minimal_header_creation():