#2149 - Created a Router-specific version of SessionManager that looks at route table rather than default gateway when dst ip address isn't for a locally attached network. Carried these changes through to arp. Added test for this. Made some minor improvements to show functions in container and node that assist debugging.

This commit is contained in:
Chris McCarthy
2024-03-28 15:52:08 +00:00
parent 3d996f05bd
commit 1ac3e1c6b4
6 changed files with 177 additions and 11 deletions

View File

@@ -107,6 +107,7 @@ SessionManager.
- Ability to add ``Router``/``Firewall`` ``ACLRule`` via config
- NMNE capturing capabilities to `NetworkInterface` class for detecting and logging Malicious Network Events.
- New `nmne_config` settings in the simulation configuration to enable NMNE capturing and specify keywords such as "DELETE".
- Router-specific SessionManager Implementation: Introduced a specialized version of the SessionManager tailored for router operations. This enhancement enables the SessionManager to determine the routing path by consulting the route table.
### Changed
- Integrated the RouteTable into the Routers frame processing.

View File

@@ -170,7 +170,9 @@ class Network(SimComponent):
print(table)
if links:
table = PrettyTable(["Endpoint A", "Endpoint B", "is Up", "Bandwidth (MBits)", "Current Load"])
table = PrettyTable(
["Endpoint A", "A Port", "Endpoint B", "B Port", "is Up", "Bandwidth (MBits)", "Current Load"]
)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
@@ -183,7 +185,9 @@ class Network(SimComponent):
table.add_row(
[
link.endpoint_a.parent.hostname,
str(link.endpoint_a),
link.endpoint_b.parent.hostname,
str(link.endpoint_b),
link.is_up,
link.bandwidth,
link.current_load_percent,

View File

@@ -889,8 +889,9 @@ class Node(SimComponent):
table.align = "l"
table.title = f"{self.hostname} Open Ports"
for port in self.software_manager.get_open_ports():
table.add_row([port.value, port.name])
print(table)
if port.value > 0:
table.add_row([port.value, port.name])
print(table.get_string(sortby="Port"))
@property
def has_enabled_network_interface(self) -> bool:
@@ -918,7 +919,7 @@ class Node(SimComponent):
table.add_row(
[
port,
type(network_interface),
network_interface.__class__.__name__,
network_interface.mac_address,
f"{network_interface.ip_address}/{network_interface.ip_network.prefixlen}",
network_interface.speed,

View File

@@ -18,6 +18,7 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.services.arp.arp import ARP
from primaite.simulator.system.services.icmp.icmp import ICMP
@@ -624,11 +625,12 @@ class RouteTable(SimComponent):
"""
pass
@validate_call()
def add_route(
self,
address: Union[IPv4Address, str],
subnet_mask: Union[IPv4Address, str],
next_hop_ip_address: Union[IPv4Address, str],
address: Union[IPV4Address, str],
subnet_mask: Union[IPV4Address, str],
next_hop_ip_address: Union[IPV4Address, str],
metric: float = 0.0,
):
"""
@@ -647,7 +649,8 @@ class RouteTable(SimComponent):
)
self.routes.append(route)
def set_default_route_next_hop_ip_address(self, ip_address: IPv4Address):
@validate_call()
def set_default_route_next_hop_ip_address(self, ip_address: IPV4Address):
"""
Sets the next-hop IP address for the default route in a routing table.
@@ -660,7 +663,7 @@ class RouteTable(SimComponent):
"""
if not self.default_route:
self.default_route = RouteEntry(
ip_address=IPv4Address("0.0.0.0"),
address=IPv4Address("0.0.0.0"),
subnet_mask=IPv4Address("0.0.0.0"),
next_hop_ip_address=ip_address,
)
@@ -1016,6 +1019,144 @@ class RouterInterface(IPWiredNetworkInterface):
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}/{self.ip_address}"
class RouterSessionManager(SessionManager):
"""
Manages network sessions, including session creation, lookup, and communication with other components.
The RouterSessionManager is a Router/Firewall specific implementation of SessionManager. It enables to resolve
outbound interface transmission details functions to leverage the route table instead of the default gateway.
:param sys_log: A reference to the system log component.
:param arp_cache: A reference to the ARP cache component.
"""
def resolve_outbound_network_interface(self, dst_ip_address: IPv4Address) -> Optional[RouterInterface]:
"""
Resolves the appropriate outbound network interface for a given destination IP address.
This method determines the most suitable network interface for sending a packet to the specified
destination IP address. It considers only enabled network interfaces and checks if the destination
IP address falls within the subnet of each interface. If no suitable local network interface is found,
the method defaults to performing a route table look-up to determine if there is a dedicated route or a default
route it can use.
The search process prioritises local network interfaces based on the IP network to which they belong.
If the destination IP address does not match any local subnet, the method assumes that the destination
is outside the local network and hence, routes the packet according to route table look-up.
:param dst_ip_address: The destination IP address for which the outbound interface is to be resolved.
:type dst_ip_address: IPv4Address
:return: The network interface through which the packet should be sent to reach the destination IP address,
or the default gateway's network interface if the destination is not within any local subnet.
:rtype: Optional[RouterInterface]
"""
network_interface = super().resolve_outbound_network_interface(dst_ip_address)
if not network_interface:
route = self.node.route_table.find_best_route(dst_ip_address)
if not route:
return None
network_interface = super().resolve_outbound_network_interface(route.next_hop_ip_address)
return network_interface
def resolve_outbound_transmission_details(
self,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
protocol: Optional[IPProtocol] = None,
session_id: Optional[str] = None,
) -> Tuple[
Optional[RouterInterface],
Optional[str],
IPv4Address,
Optional[Port],
Optional[Port],
Optional[IPProtocol],
bool,
]:
"""
Resolves the necessary details for outbound transmission based on the provided parameters.
This method determines whether the payload should be broadcast or unicast based on the destination IP address
and resolves the outbound network interface and destination MAC address accordingly.
The method first checks if `session_id` is provided and uses the session details if available. For broadcast
transmissions, it finds a suitable network interface and uses a broadcast MAC address. For unicast
transmissions, it attempts to resolve the destination MAC address using ARP and finds the appropriate
outbound network interface. If the destination IP address is outside the local network and no specific MAC
address is resolved, it defaults to performing a route table look-up to determine if there is a dedicated route
or a default route it can use.
:param dst_ip_address: The destination IP address or network. If an IPv4Network is provided, the method
treats the transmission as a broadcast to that network. Optional.
:type dst_ip_address: Optional[Union[IPv4Address, IPv4Network]]
:param src_port: The source port number for the transmission. Optional.
:type src_port: Optional[Port]
:param dst_port: The destination port number for the transmission. Optional.
:type dst_port: Optional[Port]
:param protocol: The IP protocol to be used for the transmission. Optional.
:type protocol: Optional[IPProtocol]
:param session_id: The session ID associated with the transmission. If provided, the session details override
other parameters. Optional.
:type session_id: Optional[str]
:return: A tuple containing the resolved outbound network interface, destination MAC address, destination IP
address, source port, destination port, protocol, and a boolean indicating whether the transmission is a
broadcast.
:rtype: Tuple[Optional[RouterInterface], Optional[str], IPv4Address, Optional[Port], Optional[Port],
Optional[IPProtocol], bool]
"""
if dst_ip_address and not isinstance(dst_ip_address, (IPv4Address, IPv4Network)):
dst_ip_address = IPv4Address(dst_ip_address)
is_broadcast = False
outbound_network_interface = None
dst_mac_address = None
# Use session details if session_id is provided
if session_id:
session = self.sessions_by_uuid[session_id]
dst_ip_address = session.with_ip_address
protocol = session.protocol
src_port = session.src_port
dst_port = session.dst_port
# Determine if the payload is for broadcast or unicast
# Handle broadcast transmission
if isinstance(dst_ip_address, IPv4Network):
is_broadcast = True
dst_ip_address = dst_ip_address.broadcast_address
if dst_ip_address:
# Find a suitable NIC for the broadcast
for network_interface in self.node.network_interfaces.values():
if dst_ip_address in network_interface.ip_network and network_interface.enabled:
dst_mac_address = "ff:ff:ff:ff:ff:ff"
outbound_network_interface = network_interface
break
else:
# Resolve MAC address for unicast transmission
use_route_table = True
for network_interface in self.node.network_interfaces.values():
if dst_ip_address in network_interface.ip_network and network_interface.enabled:
dst_mac_address = self.software_manager.arp.get_arp_cache_mac_address(dst_ip_address)
break
if dst_mac_address:
use_route_table = False
outbound_network_interface = self.software_manager.arp.get_arp_cache_network_interface(dst_ip_address)
if use_route_table:
route = self.node.route_table.find_best_route(dst_ip_address)
if not route:
raise Exception("cannot use route to resolve outbound details")
dst_mac_address = self.software_manager.arp.get_arp_cache_mac_address(route.next_hop_ip_address)
outbound_network_interface = self.software_manager.arp.get_arp_cache_network_interface(
route.next_hop_ip_address
)
return outbound_network_interface, dst_mac_address, dst_ip_address, src_port, dst_port, protocol, is_broadcast
class Router(NetworkNode):
"""
Represents a network router, managing routing and forwarding of IP packets across network interfaces.
@@ -1049,6 +1190,10 @@ class Router(NetworkNode):
if not kwargs.get("route_table"):
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
self.session_manager = RouterSessionManager(sys_log=self.sys_log)
self.session_manager.node = self
self.software_manager.session_manager = self.session_manager
self.session_manager.software_manager = self.software_manager
for i in range(1, self.num_ports + 1):
network_interface = RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0")
self.connect_nic(network_interface)
@@ -1418,7 +1563,7 @@ class Router(NetworkNode):
:return: Configured router.
:rtype: Router
"""
router = cls(
router = Router(
hostname=cfg["hostname"],
num_ports=int(cfg.get("num_ports", "5")),
operating_state=NodeOperatingState.ON
@@ -1440,8 +1585,8 @@ class Router(NetworkNode):
dst_port=None if not (p := r_cfg.get("dst_port")) else Port[p],
protocol=None if not (p := r_cfg.get("protocol")) else IPProtocol[p],
src_ip_address=r_cfg.get("src_ip"),
dst_ip_address=r_cfg.get("dst_ip"),
src_wildcard_mask=r_cfg.get("src_wildcard_mask"),
dst_ip_address=r_cfg.get("dst_ip"),
dst_wildcard_mask=r_cfg.get("dst_wildcard_mask"),
position=r_num,
)

View File

@@ -65,6 +65,10 @@ class ARP(Service):
"""Clears the arp cache."""
self.arp.clear()
def get_default_gateway_network_interface(self) -> Optional[NetworkInterface]:
"""Not used at the parent ARP level. Should return None when there is no override by child class."""
return None
def add_arp_cache_entry(
self, ip_address: IPV4Address, mac_address: str, network_interface: NetworkInterface, override: bool = False
):

View File

@@ -152,6 +152,17 @@ def test_with_routes_can_ping(multi_hop_network):
assert pc_a.ping(pc_b.network_interface[1].ip_address)
def test_ping_router_port_multi_hop(multi_hop_network):
pc_a = multi_hop_network.get_node_by_hostname("pc_a")
router_2 = multi_hop_network.get_node_by_hostname("router_2")
router_2.route_table.add_route(
address="192.168.0.0", subnet_mask="255.255.255.0", next_hop_ip_address="192.168.1.1"
)
assert pc_a.ping(router_2.network_interface[1].ip_address)
def test_routing_services(multi_hop_network):
pc_a = multi_hop_network.get_node_by_hostname("pc_a")