#1800 - Fixed the ping functionality so that it actually checks for replies and returns True if the right number of replies have been received.

- Added the foundations of a Router class along with ACLRule and RouteTableEntry classes.
This commit is contained in:
Chris McCarthy
2023-08-25 09:07:32 +01:00
parent 01e8501bc1
commit c6f71600fc
4 changed files with 134 additions and 8 deletions

View File

@@ -173,6 +173,9 @@ class NIC(SimComponent):
if self.connected_node.operating_state != NodeOperatingState.ON:
self.connected_node.sys_log.error(f"NIC {self} cannot be enabled as the endpoint is not turned on")
return
if not self.connected_link:
_LOGGER.error(f"NIC {self} cannot be enabled as it is not connected to a Link")
return
self.enabled = True
self.connected_node.sys_log.info(f"NIC {self} enabled")
@@ -210,6 +213,7 @@ class NIC(SimComponent):
# TODO: Inform the Node that a link has been connected
self.connected_link = link
self.enable()
_LOGGER.info(f"NIC {self} connected to Link {link}")
def disconnect_link(self):
@@ -266,8 +270,10 @@ class NIC(SimComponent):
frame.decrement_ttl()
frame.set_received_timestamp()
self.pcap.capture(frame)
self.connected_node.receive_frame(frame=frame, from_nic=self)
return True
# If this destination or is broadcast
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
self.connected_node.receive_frame(frame=frame, from_nic=self)
return True
return False
def __str__(self) -> str:
@@ -688,7 +694,6 @@ class ARPCache:
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet)
from_nic.send_frame(frame)
class ICMP:
"""
The ICMP (Internet Control Message Protocol) class.
@@ -705,6 +710,8 @@ class ICMP:
"""
self.sys_log: SysLog = sys_log
self.arp: ARPCache = arp_cache
self.request_replies = {}
def process_icmp(self, frame: Frame):
"""
@@ -733,6 +740,9 @@ class ICMP:
src_nic.send_frame(frame)
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
self.sys_log.info(f"Received echo reply from {frame.ip.src_ip}")
if not self.request_replies.get(frame.icmp.identifier):
self.request_replies[frame.icmp.identifier] = 0
self.request_replies[frame.icmp.identifier] += 1
def ping(
self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None
@@ -875,7 +885,7 @@ class Node(SimComponent):
return state
def show(self):
"""Prints a table of the NICs on the Node.."""
"""Prints a table of the NICs on the Node."""
from prettytable import PrettyTable
table = PrettyTable(["MAC Address", "Address", "Default Gateway", "Speed", "Status"])
@@ -898,7 +908,8 @@ class Node(SimComponent):
self.operating_state = NodeOperatingState.ON
self.sys_log.info("Turned on")
for nic in self.nics.values():
nic.enable()
if nic.connected_link:
nic.enable()
def power_off(self):
"""Power off the Node, disabling its NICs if it is in the ON state."""
@@ -961,7 +972,9 @@ class Node(SimComponent):
sequence, identifier = 0, None
while sequence < pings:
sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier)
return True
passed = self.icmp.request_replies[identifier] == pings
self.icmp.request_replies.pop(identifier)
return passed
self.sys_log.info("Ping failed as the node is turned off")
return False

View File

@@ -0,0 +1,86 @@
from enum import Enum
from ipaddress import IPv4Address
from typing import Dict, List, Union
from primaite.simulator.core import SimComponent
from primaite.simulator.network.hardware.base import Node, NIC
from prettytable import PrettyTable
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
class ACLAction(Enum):
DENY = 0
PERMIT = 1
class ACLRule(SimComponent):
action: ACLAction
protocol: IPProtocol
src_ip: IPv4Address
src_wildcard: IPv4Address = IPv4Address("0.0.0.0")
src_port: Port
dst_ip: IPv4Address
dst_port: Port
class RouteTableEntry(SimComponent):
pass
class Router(Node):
num_ports: int
ethernet_ports: Dict[int, NIC] = {}
acl: List = []
route_table: Dict = {}
def __init__(self, hostname: str, num_ports: int = 5, **kwargs):
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
for i in range(1, self.num_ports + 1):
nic = NIC(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0")
self.connect_nic(nic)
self.ethernet_ports[i] = nic
def describe_state(self) -> Dict:
pass
def configure_port(
self,
port: int,
ip_address: Union[IPv4Address, str],
subnet_mask: str
):
if not isinstance(ip_address, IPv4Address):
ip_address = IPv4Address(ip_address)
nic = self.ethernet_ports[port]
nic.ip_address = ip_address
nic.subnet_mask = subnet_mask
self.sys_log.info(f"Configured port {port} with {ip_address=} {subnet_mask=}")
def enable_port(self, port: int):
nic = self.ethernet_ports.get(port)
if nic:
nic.enable()
def disable_port(self, port: int):
nic = self.ethernet_ports.get(port)
if nic:
nic.disable()
def show(self):
"""Prints a table of the NICs on the Node."""
table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
for port, nic in self.ethernet_ports.items():
table.add_row(
[
port,
nic.mac_address,
f"{nic.ip_address}/{nic.ip_network.prefixlen}",
nic.speed,
"Enabled" if nic.enabled else "Disabled",
]
)
print(table)

View File

@@ -45,7 +45,7 @@ def test_multi_nic():
node_a.ping("192.168.0.11")
node_c.ping("10.0.0.12")
assert node_c.ping("10.0.0.12")
def test_switched_network():
@@ -83,4 +83,4 @@ def test_switched_network():
link_nic_d_switch_2 = Link(endpoint_a=nic_d, endpoint_b=switch_2.switch_ports[2])
link_switch_1_switch_2 = Link(endpoint_a=switch_1.switch_ports[6], endpoint_b=switch_2.switch_ports[6])
pc_a.ping("192.168.0.13")
assert pc_a.ping("192.168.0.13")

View File

@@ -0,0 +1,27 @@
from primaite.simulator.network.hardware.base import Node, NIC, Link
from primaite.simulator.network.hardware.nodes.router import Router
def test_ping_fails_with_no_route():
"""Tests a larges network of Nodes and Switches with one node pinging another."""
pc_a = Node(hostname="pc_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_a.connect_nic(nic_a)
pc_a.power_on()
pc_b = Node(hostname="pc_b")
nic_b = NIC(ip_address="192.168.1.10", subnet_mask="255.255.255.0", gateway="192.168.1.1")
pc_b.connect_nic(nic_b)
pc_b.power_on()
router_1 = Router(hostname="router_1")
router_1.configure_port(1, "192.168.0.1", "255.255.255.0")
router_1.configure_port(2, "192.168.1.1", "255.255.255.0")
router_1.power_on()
router_1.show()
link_nic_a_router_1 = Link(endpoint_a=nic_a, endpoint_b=router_1.ethernet_ports[1])
link_nic_b_router_1 = Link(endpoint_a=nic_b, endpoint_b=router_1.ethernet_ports[2])
router_1.power_on()
#assert pc_a.ping("192.168.1.10")