#2248 - Initial crack at getting ARP into a Service. Lots of refactoring has been done. It's a mess at the minute, but I can successfully send an ARP request so committing as a successful point in time

This commit is contained in:
Chris McCarthy
2024-02-01 22:19:55 +00:00
parent 0c3304b1fd
commit 9577f212f8
10 changed files with 503 additions and 138 deletions

View File

@@ -12,8 +12,8 @@ class _SimOutput:
self._path: Path = (
_PRIMAITE_ROOT.parent.parent / "simulation_output" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
)
self.save_pcap_logs: bool = False
self.save_sys_logs: bool = False
self.save_pcap_logs: bool = True
self.save_sys_logs: bool = True
@property
def path(self) -> Path:

View File

@@ -18,7 +18,7 @@ from primaite.simulator.network.hardware.node_operating_state import NodeOperati
from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.packet_capture import PacketCapture
from primaite.simulator.system.core.session_manager import SessionManager
@@ -617,6 +617,7 @@ class ARPCache:
self.sys_log: "SysLog" = sys_log
self.arp: Dict[IPv4Address, ARPEntry] = {}
self.nics: Dict[str, "NIC"] = {}
self.node = None
def show(self, markdown: bool = False):
"""Prints a table of ARC Cache."""
@@ -669,6 +670,36 @@ class ARPCache:
if ip_address in self.arp:
del self.arp[ip_address]
def get_default_gateway_mac_address(self) -> Optional[str]:
if self.arp.node.default_gateway:
return self.get_arp_cache_mac_address(self.arp.node.default_gateway)
def get_default_gateway_nic(self) -> Optional[NIC]:
if self.arp.node.default_gateway:
return self.get_arp_cache_nic(self.arp.node.default_gateway)
def _get_arp_cache_mac_address(
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
) -> Optional[str]:
arp_entry = self.arp.get(ip_address)
if arp_entry:
return arp_entry.mac_address
else:
if not is_reattempt:
self.send_arp_request(ip_address)
return self._get_arp_cache_mac_address(
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
return self._get_arp_cache_mac_address(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
"""
Get the MAC address associated with an IP address.
@@ -676,9 +707,29 @@ class ARPCache:
:param ip_address: The IP address to look up in the cache.
:return: The MAC address associated with the IP address, or None if not found.
"""
return self._get_arp_cache_mac_address(ip_address)
def _get_arp_cache_nic(
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
) -> Optional[NIC]:
arp_entry = self.arp.get(ip_address)
if arp_entry:
return arp_entry.mac_address
return self.nics[arp_entry.nic_uuid]
else:
if not is_reattempt:
self.send_arp_request(ip_address)
return self._get_arp_cache_nic(
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
return self._get_arp_cache_nic(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
def get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
"""
@@ -687,10 +738,7 @@ class ARPCache:
:param ip_address: The IP address to look up in the cache.
:return: The NIC associated with the IP address, or None if not found.
"""
arp_entry = self.arp.get(ip_address)
if arp_entry:
return self.nics[arp_entry.nic_uuid]
return self._get_arp_cache_nic(ip_address)
def clear_arp_cache(self):
"""Clear the entire ARP cache, removing all stored entries."""
@@ -721,12 +769,11 @@ class ARPCache:
use_nic = False
if nic.enabled and use_nic:
self.sys_log.info(f"Sending ARP request from NIC {nic} for ip {target_ip_address}")
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
udp_header = UDPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(
src_ip_address=nic.ip_address,
dst_ip_address=target_ip_address,
src_ip_address=nic.ip_address, dst_ip_address=target_ip_address, protocol=IPProtocol.UDP
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=nic.mac_address, dst_mac_addr="ff:ff:ff:ff:ff:ff")
@@ -735,7 +782,7 @@ class ARPCache:
sender_mac_addr=nic.mac_address,
target_ip_address=target_ip_address,
)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, udp=udp_header, payload=arp_packet)
nic.send_frame(frame)
def send_arp_reply(self, arp_reply: ARPPacket, from_nic: NIC):
@@ -888,25 +935,14 @@ class ICMP:
was not found in the ARP cache.
"""
nic = self.arp.get_arp_cache_nic(target_ip_address)
# TODO: Eventually this ARP request needs to be done elsewhere. It's not the responsibility of the
# ping function to handle ARP lookups
# Already tried once and cannot get ARP entry, stop trying
if sequence == -1:
if not nic:
return 4, None
else:
sequence = 0
# No existing ARP entry
if not nic:
self.sys_log.info(f"No entry in ARP cache for {target_ip_address}")
self.arp.send_arp_request(target_ip_address)
return -1, None
return pings, None
# ARP entry exists
sequence += 1
target_mac_address = self.arp.get_arp_cache_mac_address(target_ip_address)
src_nic = self.arp.get_arp_cache_nic(target_ip_address)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
@@ -1026,6 +1062,7 @@ class Node(SimComponent):
)
super().__init__(**kwargs)
self.arp.nics = self.nics
self.arp.node = self
self.session_manager.software_manager = self.software_manager
self._install_system_software()
self.set_original_state()
@@ -1407,7 +1444,9 @@ class Node(SimComponent):
self.sys_log.info("Pinging loopback address")
return any(nic.enabled for nic in self.nics.values())
if self.operating_state == NodeOperatingState.ON:
self.sys_log.info(f"Pinging {target_ip_address}:")
output = f"Pinging {target_ip_address}:"
self.sys_log.info(output)
print(output)
sequence, identifier = 0, None
while sequence < pings:
sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier, pings)
@@ -1417,12 +1456,14 @@ class Node(SimComponent):
self.icmp.request_replies.pop(identifier)
else:
request_replies = 0
self.sys_log.info(
output = (
f"Ping statistics for {target_ip_address}: "
f"Packets: Sent = {pings}, "
f"Received = {request_replies}, "
f"Lost = {pings - request_replies} ({(pings - request_replies) / pings * 100}% loss)"
)
self.sys_log.info(output)
print(output)
return passed
return False
@@ -1456,12 +1497,18 @@ class Node(SimComponent):
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
return
# Check if the destination port is open on the Node
if frame.tcp.dst_port in self.software_manager.get_open_ports():
dst_port = None
if frame.tcp:
dst_port = frame.tcp.dst_port
elif frame.udp:
dst_port = frame.udp.dst_port
if dst_port in self.software_manager.get_open_ports():
# accept thr frame as the port is open
if frame.tcp.src_port == Port.ARP:
self.arp.process_arp_packet(from_nic=from_nic, arp_packet=frame.arp)
else:
self.session_manager.receive_frame(frame)
self.session_manager.receive_frame(frame, from_nic)
# if frame.tcp.src_port == Port.ARP:
# self.arp.process_arp_packet(from_nic=from_nic, arp_packet=frame.arp)
# else:
# self.session_manager.receive_frame(frame)
else:
# denied as port closed
self.sys_log.info(f"Ignoring frame for port {frame.tcp.dst_port.value} from {frame.ip.src_ip_address}")

View File

@@ -566,33 +566,32 @@ class RouterARPCache(ARPCache):
# ARP Reply
if not arp_packet.request:
for nic in self.router.nics.values():
if arp_packet.target_ip_address == nic.ip_address:
# reply to the Router specifically
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address,
mac_address=arp_packet.sender_mac_addr,
nic=from_nic,
)
return
# Reply for a connected requested
nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
if nic:
if arp_packet.target_ip_address == from_nic.ip_address:
# reply to the Router specifically
self.sys_log.info(
f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
arp_packet.sender_mac_addr = nic.mac_address
frame.decrement_ttl()
if frame.ip and frame.ip.ttl < 1:
self.sys_log.info("Frame discarded as TTL limit reached")
return
nic.send_frame(frame)
return
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address,
mac_address=arp_packet.sender_mac_addr,
nic=from_nic,
)
return
# # Reply for a connected requested
# nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
# if nic:
# self.sys_log.info(
# f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
# )
# arp_packet.sender_mac_addr = nic.mac_address
# frame.decrement_ttl()
# if frame.ip and frame.ip.ttl < 1:
# self.sys_log.info("Frame discarded as TTL limit reached")
# return
# nic.send_frame(frame)
# return
# ARP Request
self.sys_log.info(
@@ -606,28 +605,27 @@ class RouterARPCache(ARPCache):
# If the target IP matches one of the router's NICs
for nic in self.nics.values():
if arp_packet.target_ip_address in nic.ip_network:
# if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
arp_reply = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_reply, from_nic)
return
# Check Route Table
route = route_table.find_best_route(arp_packet.target_ip_address)
if route:
nic = self.get_arp_cache_nic(route.next_hop_ip_address)
if not nic:
if not is_reattempt:
self.send_arp_request(route.next_hop_ip_address, ignore_networks=[frame.ip.src_ip_address])
return self.process_arp_packet(from_nic, frame, route_table, is_reattempt=True)
else:
self.sys_log.info("Ignoring ARP request as destination unavailable/No ARP entry found")
return
else:
arp_reply = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_reply, from_nic)
return
# # Check Route Table
# route = route_table.find_best_route(arp_packet.target_ip_address)
# if route and route != self.router.route_table.default_route:
# nic = self.get_arp_cache_nic(route.next_hop_ip_address)
#
# if not nic:
# if not is_reattempt:
# self.send_arp_request(route.next_hop_ip_address, ignore_networks=[frame.ip.src_ip_address])
# return self.process_arp_packet(from_nic, frame, route_table, is_reattempt=True)
# else:
# self.sys_log.info("Ignoring ARP request as destination unavailable/No ARP entry found")
# return
# else:
# arp_reply = arp_packet.generate_reply(from_nic.mac_address)
# self.send_arp_reply(arp_reply, from_nic)
# return
class RouterICMP(ICMP):
@@ -949,13 +947,13 @@ class Router(Node):
at_port = self._get_port_of_nic(from_nic)
self.sys_log.info(f"Frame blocked at port {at_port} by rule {rule}")
return
if not self.arp.get_arp_cache_nic(src_ip_address):
self.arp.add_arp_cache_entry(src_ip_address, frame.ethernet.src_mac_addr, from_nic)
self.arp.add_arp_cache_entry(src_ip_address, frame.ethernet.src_mac_addr, from_nic)
if frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
else:
if src_port == Port.ARP:
self.arp.process_arp_packet(from_nic=from_nic, frame=frame, route_table=self.route_table)
return
else:
# All other traffic
process_frame = True

View File

@@ -73,7 +73,7 @@ class Frame(BaseModel):
msg = "Cannot build a Frame using the TCP IP Protocol without a TCPHeader"
_LOGGER.error(msg)
raise ValueError(msg)
if kwargs["ip"].protocol == IPProtocol.UDP and not kwargs.get("UDP"):
if kwargs["ip"].protocol == IPProtocol.UDP and not kwargs.get("udp"):
msg = "Cannot build a Frame using the UDP IP Protocol without a UDPHeader"
_LOGGER.error(msg)
raise ValueError(msg)
@@ -95,8 +95,6 @@ class Frame(BaseModel):
"UDP header."
icmp: Optional[ICMPPacket] = None
"ICMP header."
arp: Optional[ARPPacket] = None
"ARP packet."
primaite: PrimaiteHeader
"PrimAITE header."
payload: Optional[Any] = None

View File

@@ -8,10 +8,10 @@ from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import SimComponent
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader
if TYPE_CHECKING:
from primaite.simulator.network.hardware.base import ARPCache
from primaite.simulator.network.hardware.base import ARPCache, NIC
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.core.sys_log import SysLog
@@ -138,37 +138,19 @@ class SessionManager:
dst_port = None
return protocol, with_ip_address, src_port, dst_port
def receive_payload_from_software_manager(
self,
payload: Any,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
dst_port: Optional[Port] = None,
session_id: Optional[str] = None,
is_reattempt: bool = False,
) -> Union[Any, None]:
"""
Receive a payload from the SoftwareManager and send it to the appropriate NIC for transmission.
This method supports both unicast and Layer 3 broadcast transmissions. If `dst_ip_address` is an
IPv4Network, a broadcast is initiated. For unicast, the destination MAC address is resolved via ARP.
A new session is established if `session_id` is not provided, and an existing session is used otherwise.
:param payload: The payload to be sent.
:param dst_ip_address: The destination IP address or network for broadcast. Optional.
:param dst_port: The destination port for the TCP packet. Optional.
:param session_id: The Session ID from which the payload originates. Optional.
:param is_reattempt: Flag to indicate if this is a reattempt after an ARP request. Default is False.
:return: The outcome of sending the frame, or None if sending was unsuccessful.
"""
def resolve_outbound_transmission_details(
self, dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, session_id: Optional[str] = None
) -> Tuple[Optional["NIC"], Optional[str], Optional[IPProtocol], bool]:
is_broadcast = False
outbound_nic = None
dst_mac_address = None
protocol = None
# Use session details if session_id is provided
if session_id:
session = self.sessions_by_uuid[session_id]
dst_ip_address = session.with_ip_address
dst_port = session.dst_port
protocol = session.protocol
# Determine if the payload is for broadcast or unicast
@@ -182,47 +164,81 @@ class SessionManager:
if dst_ip_address in nic.ip_network and nic.enabled:
dst_mac_address = "ff:ff:ff:ff:ff:ff"
outbound_nic = nic
break
else:
# Resolve MAC address for unicast transmission
dst_mac_address = self.arp_cache.get_arp_cache_mac_address(dst_ip_address)
use_default_gateway = True
for nic in self.arp_cache.nics.values():
if dst_ip_address in nic.ip_network and nic.enabled:
dst_mac_address = self.arp_cache.get_arp_cache_mac_address(dst_ip_address)
break
# Resolve outbound NIC for unicast transmission
if dst_mac_address:
if dst_ip_address:
use_default_gateway = False
outbound_nic = self.arp_cache.get_arp_cache_nic(dst_ip_address)
# If MAC address not found, initiate ARP request
else:
if not is_reattempt:
self.arp_cache.send_arp_request(dst_ip_address)
# Reattempt payload transmission after ARP request
return self.receive_payload_from_software_manager(
payload=payload,
dst_ip_address=dst_ip_address,
dst_port=dst_port,
session_id=session_id,
is_reattempt=True,
)
else:
# Return None if reattempt fails
return
if use_default_gateway:
dst_mac_address = self.arp_cache.get_default_gateway_mac_address()
outbound_nic = self.arp_cache.get_default_gateway_nic()
return outbound_nic, dst_mac_address, protocol, is_broadcast
def receive_payload_from_software_manager(
self,
payload: Any,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
dst_port: Optional[Port] = None,
session_id: Optional[str] = None,
ip_protocol: IPProtocol = IPProtocol.TCP,
) -> Union[Any, None]:
"""
Receive a payload from the SoftwareManager and send it to the appropriate NIC for transmission.
This method supports both unicast and Layer 3 broadcast transmissions. If `dst_ip_address` is an
IPv4Network, a broadcast is initiated. For unicast, the destination MAC address is resolved via ARP.
A new session is established if `session_id` is not provided, and an existing session is used otherwise.
:param payload: The payload to be sent.
:param dst_ip_address: The destination IP address or network for broadcast. Optional.
:param dst_port: The destination port for the TCP packet. Optional.
:param session_id: The Session ID from which the payload originates. Optional.
:return: The outcome of sending the frame, or None if sending was unsuccessful.
"""
print(ip_protocol)
outbound_nic, dst_mac_address, protocol, is_broadcast = self.resolve_outbound_transmission_details(
dst_ip_address=dst_ip_address, session_id=session_id
)
if protocol:
ip_protocol = protocol
print(ip_protocol)
# Check if outbound NIC and destination MAC address are resolved
if not outbound_nic or not dst_mac_address:
return False
tcp_header = None
udp_header = None
if ip_protocol == IPProtocol.TCP:
TCPHeader(
src_port=dst_port,
dst_port=dst_port,
)
elif ip_protocol == IPProtocol:
udp_header = UDPHeader(
src_port=dst_port,
dst_port=dst_port,
)
# Construct the frame for transmission
frame = Frame(
ethernet=EthernetHeader(src_mac_addr=outbound_nic.mac_address, dst_mac_addr=dst_mac_address),
ip=IPPacket(
src_ip_address=outbound_nic.ip_address,
dst_ip_address=dst_ip_address,
),
tcp=TCPHeader(
src_port=dst_port,
dst_port=dst_port,
),
ip=IPPacket(src_ip_address=outbound_nic.ip_address, dst_ip_address=dst_ip_address, ip_protocol=ip_protocol),
tcp=tcp_header,
udp_header=udp_header,
payload=payload,
)
print(frame)
# Manage session for unicast transmission
if not (is_broadcast and session_id):
@@ -237,7 +253,7 @@ class SessionManager:
# Send the frame through the NIC
return outbound_nic.send_frame(frame)
def receive_frame(self, frame: Frame):
def receive_frame(self, frame: Frame, from_nic: NIC):
"""
Receive a Frame.
@@ -253,8 +269,13 @@ class SessionManager:
session = Session.from_session_key(session_key)
self.sessions_by_key[session_key] = session
self.sessions_by_uuid[session.uuid] = session
dst_port = None
if frame.tcp:
dst_port = frame.tcp.dst_port
elif frame.udp:
dst_port = frame.udp.dst_port
self.software_manager.receive_payload_from_session_manager(
payload=frame.payload, port=frame.tcp.dst_port, protocol=frame.ip.protocol, session_id=session.uuid
payload=frame.payload, port=dst_port, protocol=frame.ip.protocol, session_id=session.uuid, from_nic=from_nic
)
def show(self, markdown: bool = False):

View File

@@ -14,7 +14,7 @@ from primaite.simulator.system.software import IOSoftware
if TYPE_CHECKING:
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.hardware.base import Node, NIC
from typing import Type, TypeVar
@@ -52,11 +52,10 @@ class SoftwareManager:
:return: A list of all open ports on the Node.
"""
open_ports = [Port.ARP]
open_ports = []
for software in self.port_protocol_mapping.values():
if software.operating_state in {ApplicationOperatingState.RUNNING, ServiceOperatingState.RUNNING}:
open_ports.append(software.port)
open_ports.sort(key=lambda port: port.value)
return open_ports
def install(self, software_class: Type[IOSoftwareClass]):
@@ -132,6 +131,7 @@ class SoftwareManager:
payload: Any,
dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
dest_port: Optional[Port] = None,
ip_protocol: IPProtocol = IPProtocol.TCP,
session_id: Optional[str] = None,
) -> bool:
"""
@@ -154,7 +154,9 @@ class SoftwareManager:
session_id=session_id,
)
def receive_payload_from_session_manager(self, payload: Any, port: Port, protocol: IPProtocol, session_id: str):
def receive_payload_from_session_manager(
self, payload: Any, port: Port, protocol: IPProtocol, session_id: str, from_nic: "NIC"
):
"""
Receive a payload from the SessionManager and forward it to the corresponding service or application.
@@ -163,7 +165,7 @@ class SoftwareManager:
"""
receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None)
if receiver:
receiver.receive(payload=payload, session_id=session_id)
receiver.receive(payload=payload, session_id=session_id, from_nic=from_nic)
else:
self.sys_log.error(f"No service or application found for port {port} and protocol {protocol}")
pass

View File

@@ -0,0 +1,201 @@
from __future__ import annotations
from abc import abstractmethod
from ipaddress import IPv4Address
from typing import Any, Dict, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel
from primaite.simulator.network.hardware.base import NIC
from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
from primaite.simulator.network.protocols.packet import DataPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader
from primaite.simulator.system.services.service import Service
class ARP(Service):
arp: Dict[IPv4Address, ARPEntry] = {}
def __init__(self, **kwargs):
kwargs["name"] = "ARP"
kwargs["port"] = Port.ARP
kwargs["protocol"] = IPProtocol.UDP
super().__init__(**kwargs)
def describe_state(self) -> Dict:
pass
def show(self, markdown: bool = False):
"""Prints a table of ARC Cache."""
table = PrettyTable(["IP Address", "MAC Address", "Via"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} ARP Cache"
for ip, arp in self.arp.items():
table.add_row(
[
str(ip),
arp.mac_address,
self.software_manager.node.nics[arp.nic_uuid].mac_address,
]
)
print(table)
def clear(self):
"""Clears the arp cache."""
self.arp.clear()
def add_arp_cache_entry(self, ip_address: IPv4Address, mac_address: str, nic: NIC, override: bool = False):
"""
Add an ARP entry to the cache.
If an entry for the given IP address already exists, the entry is only updated if the `override` parameter is
set to True.
:param ip_address: The IP address to be added to the cache.
:param mac_address: The MAC address associated with the IP address.
:param nic: The NIC through which the NIC with the IP address is reachable.
:param override: If True, an existing entry for the IP address will be overridden. Default is False.
"""
for _nic in self.software_manager.node.nics.values():
if _nic.ip_address == ip_address:
return
if override or not self.arp.get(ip_address):
self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}")
arp_entry = ARPEntry(mac_address=mac_address, nic_uuid=nic.uuid)
self.arp[ip_address] = arp_entry
@abstractmethod
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
"""
Get the MAC address associated with an IP address.
:param ip_address: The IP address to look up in the cache.
:return: The MAC address associated with the IP address, or None if not found.
"""
pass
@abstractmethod
def get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
"""
Get the NIC associated with an IP address.
:param ip_address: The IP address to look up in the cache.
:return: The NIC associated with the IP address, or None if not found.
"""
pass
def send_arp_request(self, target_ip_address: Union[IPv4Address, str]):
"""
Perform a standard ARP request for a given target IP address.
Broadcasts the request through all enabled NICs to determine the MAC address corresponding to the target IP
address. This method can be configured to ignore specific networks when sending out ARP requests,
which is useful in environments where certain addresses should not be queried.
:param target_ip_address: The target IP address to send an ARP request for.
:param ignore_networks: An optional list of IPv4 addresses representing networks to be excluded from the ARP
request broadcast. Each address in this list indicates a network which will not be queried during the ARP
request process. This is particularly useful in complex network environments where traffic should be
minimized or controlled to specific subnets. It is mainly used by the router to prevent ARP requests being
sent back to their source.
"""
vals: Tuple = self.software_manager.session_manager.resolve_outbound_transmission_details(target_ip_address)
outbound_nic, _, _, _ = vals
if outbound_nic:
self.sys_log.info(f"Sending ARP request from NIC {outbound_nic} for ip {target_ip_address}")
arp_packet = ARPPacket(
sender_ip_address=outbound_nic.ip_address,
sender_mac_addr=outbound_nic.mac_address,
target_ip_address=target_ip_address,
)
self.software_manager.session_manager.receive_payload_from_software_manage(
payload=arp_packet, dst_port=Port.ARP, ip_protocol=self.protocol
)
else:
print(f"failed for {target_ip_address}")
def send_arp_reply(self, arp_reply: ARPPacket, from_nic: NIC):
"""
Send an ARP reply back through the NIC it came from.
:param arp_reply: The ARP reply to send.
:param from_nic: The NIC to send the ARP reply from.
"""
self.sys_log.info(
f"Sending ARP reply from {arp_reply.sender_mac_addr}/{arp_reply.sender_ip_address} "
f"to {arp_reply.target_ip_address}/{arp_reply.target_mac_addr} "
)
udp_header = UDPHeader(src_port=Port.ARP, dst_port=Port.ARP)
ip_packet = IPPacket(
src_ip_address=arp_reply.sender_ip_address,
dst_ip_address=arp_reply.target_ip_address,
protocol=IPProtocol.UDP,
)
ethernet_header = EthernetHeader(src_mac_addr=arp_reply.sender_mac_addr, dst_mac_addr=arp_reply.target_mac_addr)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, udp=udp_header, payload=arp_reply)
from_nic.send_frame(frame)
def process_arp_packet(self, from_nic: NIC, arp_packet: ARPPacket):
"""
Process a received ARP packet, handling both ARP requests and responses.
If an ARP request is received for the local IP, a response is sent back.
If an ARP response is received, the ARP cache is updated with the new entry.
:param from_nic: The NIC that received the ARP packet.
:param arp_packet: The ARP packet to be processed.
"""
# Unmatched ARP Request
if arp_packet.target_ip_address != from_nic.ip_address:
self.sys_log.info(
f"Ignoring ARP request for {arp_packet.target_ip_address}. Current IP address is {from_nic.ip_address}"
)
return
# Matched ARP request
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
@abstractmethod
def _process_arp_request(self, arp_packet: ARPPacket, from_nic: NIC):
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
def _process_arp_reply(self, arp_packet: ARPPacket, from_nic: NIC):
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
if not isinstance(payload, ARPPacket):
print("failied on payload check", type(payload))
return False
from_nic = kwargs.get("from_nic")
if payload.request:
print(from_nic)
self._process_arp_request(arp_packet=payload, from_nic=from_nic)
else:
self._process_arp_reply(arp_packet=payload, from_nic=from_nic)
def __contains__(self, item: Any) -> bool:
return item in self.arp

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Optional
from primaite.simulator.network.hardware.base import NIC
from primaite.simulator.system.services.arp.arp import ARP, ARPPacket
class HostARP(ARP):
def get_default_gateway_mac_address(self) -> Optional[str]:
if self.software_manager.node.default_gateway:
return self.get_arp_cache_mac_address(self.software_manager.node.default_gateway)
def get_default_gateway_nic(self) -> Optional[NIC]:
if self.software_manager.node.default_gateway:
return self.get_arp_cache_nic(self.software_manager.node.default_gateway)
def _get_arp_cache_mac_address(
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
) -> Optional[str]:
arp_entry = self.arp.get(ip_address)
if arp_entry:
return arp_entry.mac_address
else:
if not is_reattempt:
self.send_arp_request(ip_address)
return self._get_arp_cache_mac_address(
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
return self._get_arp_cache_mac_address(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
"""
Get the MAC address associated with an IP address.
:param ip_address: The IP address to look up in the cache.
:return: The MAC address associated with the IP address, or None if not found.
"""
return self._get_arp_cache_mac_address(ip_address)
def _get_arp_cache_nic(
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
) -> Optional[NIC]:
arp_entry = self.arp.get(ip_address)
if arp_entry:
return self.nics[arp_entry.nic_uuid]
else:
if not is_reattempt:
self.send_arp_request(ip_address)
return self._get_arp_cache_nic(
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
)
else:
if self.node.default_gateway:
if not is_default_gateway_attempt:
self.send_arp_request(self.node.default_gateway)
return self._get_arp_cache_nic(
ip_address=self.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
)
return None
def get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
"""
Get the NIC associated with an IP address.
:param ip_address: The IP address to look up in the cache.
:return: The NIC associated with the IP address, or None if not found.
"""
return self._get_arp_cache_nic(ip_address)
def _process_arp_request(self, arp_packet: ARPPacket, from_nic: NIC):
super()._process_arp_request(arp_packet, from_nic)
# Unmatched ARP Request
if arp_packet.target_ip_address != from_nic.ip_address:
self.sys_log.info(
f"Ignoring ARP request for {arp_packet.target_ip_address}. Current IP address is {from_nic.ip_address}"
)
return
# Matched ARP request
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)

View File

@@ -8,6 +8,7 @@ from typing import Any, Dict, Optional, Union
from primaite.simulator.core import _LOGGER, RequestManager, RequestType, SimComponent
from primaite.simulator.file_system.file_system import FileSystem, Folder
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.session_manager import Session
from primaite.simulator.system.core.sys_log import SysLog
@@ -242,6 +243,8 @@ class IOSoftware(Software):
"Indicates if the software uses UDP protocol for communication. Default is True."
port: Port
"The port to which the software is connected."
protocol: IPProtocol
"The IP Protocol the Software operates on."
_connections: Dict[str, Dict] = {}
"Active connections."