#1800 - Renamed all ip fields so that they're post-fixed with ip_address

This commit is contained in:
Chris McCarthy
2023-09-04 14:58:34 +01:00
parent 05959e5408
commit 3075d1985b
12 changed files with 166 additions and 166 deletions

View File

@@ -579,9 +579,13 @@ class ARPCache:
"""
Add an ARP entry to the cache.
If an entry for the given IP address already exists, the entry is only updated if the `override` parameter is
set to True.
:param ip_address: The IP address to be added to the cache.
:param mac_address: The MAC address associated with the IP address.
:param nic: The NIC through which the NIC with the IP address is reachable.
:param override: If True, an existing entry for the IP address will be overridden. Default is False.
"""
for _nic in self.nics.values():
if _nic.ip_address == ip_address:
@@ -644,13 +648,13 @@ class ARPCache:
# Network Layer
ip_packet = IPPacket(
src_ip=nic.ip_address,
dst_ip=target_ip_address,
src_ip_address=nic.ip_address,
dst_ip_address=target_ip_address,
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=nic.mac_address, dst_mac_addr="ff:ff:ff:ff:ff:ff")
arp_packet = ARPPacket(
sender_ip=nic.ip_address, sender_mac_addr=nic.mac_address, target_ip=target_ip_address
sender_ip_address=nic.ip_address, sender_mac_addr=nic.mac_address, target_ip_address=target_ip_address
)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet)
nic.send_frame(frame)
@@ -663,14 +667,14 @@ class ARPCache:
:param from_nic: The NIC to send the ARP reply from.
"""
self.sys_log.info(
f"Sending ARP reply from {arp_reply.sender_mac_addr}/{arp_reply.sender_ip} "
f"to {arp_reply.target_ip}/{arp_reply.target_mac_addr} "
f"Sending ARP reply from {arp_reply.sender_mac_addr}/{arp_reply.sender_ip_address} "
f"to {arp_reply.target_ip_address}/{arp_reply.target_mac_addr} "
)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
ip_packet = IPPacket(
src_ip=arp_reply.sender_ip,
dst_ip=arp_reply.target_ip,
src_ip_address=arp_reply.sender_ip_address,
dst_ip_address=arp_reply.target_ip_address,
)
ethernet_header = EthernetHeader(src_mac_addr=arp_reply.sender_mac_addr, dst_mac_addr=arp_reply.target_mac_addr)
@@ -691,26 +695,26 @@ class ARPCache:
# ARP Reply
if not arp_packet.request:
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip} from {arp_packet.sender_mac_addr} via NIC {from_nic}"
f"Received ARP response for {arp_packet.sender_ip_address} from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
return
# ARP Request
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip} "
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Unmatched ARP Request
if arp_packet.target_ip != from_nic.ip_address:
self.sys_log.info(f"Ignoring ARP request for {arp_packet.target_ip}")
if arp_packet.target_ip_address != from_nic.ip_address:
self.sys_log.info(f"Ignoring ARP request for {arp_packet.target_ip_address}")
return
# Matched ARP request
self.add_arp_cache_entry(ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
self.add_arp_cache_entry(ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
@@ -744,18 +748,18 @@ class ICMP:
"""
if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
if not is_reattempt:
self.sys_log.info(f"Received echo request from {frame.ip.src_ip}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip)
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address)
if not src_nic:
self.arp.send_arp_request(frame.ip.src_ip)
self.arp.send_arp_request(frame.ip.src_ip_address)
self.process_icmp(frame=frame, from_nic=from_nic, is_reattempt=True)
return
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(src_ip=src_nic.ip_address, dst_ip=frame.ip.src_ip, protocol=IPProtocol.ICMP)
ip_packet = IPPacket(src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_reply_packet = ICMPPacket(
@@ -768,14 +772,14 @@ class ICMP:
frame = Frame(
ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet, payload=payload
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}")
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}")
src_nic.send_frame(frame)
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
f"Reply from {frame.ip.src_ip}: "
f"Reply from {frame.ip.src_ip_address}: "
f"bytes={len(frame.payload)}, "
f"time={time_str}, "
f"TTL={frame.ip.ttl}"
@@ -821,8 +825,8 @@ class ICMP:
# Network Layer
ip_packet = IPPacket(
src_ip=nic.ip_address,
dst_ip=target_ip_address,
src_ip_address=nic.ip_address,
dst_ip_address=target_ip_address,
protocol=IPProtocol.ICMP,
)
# Data Link Layer
@@ -1059,7 +1063,7 @@ class Node(SimComponent):
:param frame: The Frame to be sent.
"""
nic: NIC = self._get_arp_cache_nic(frame.ip.dst_ip)
nic: NIC = self._get_arp_cache_nic(frame.ip.dst_ip_address)
nic.send_frame(frame)
def receive_frame(self, frame: Frame, from_nic: NIC):
@@ -1073,9 +1077,9 @@ class Node(SimComponent):
:param from_nic: The NIC that received the frame.
"""
if frame.ip:
if frame.ip.src_ip in self.arp:
if frame.ip.src_ip_address in self.arp:
self.arp.add_arp_cache_entry(
ip_address=frame.ip.src_ip, mac_address=frame.ethernet.src_mac_addr, nic=from_nic
ip_address=frame.ip.src_ip_address, mac_address=frame.ethernet.src_mac_addr, nic=from_nic
)
if frame.ip.protocol == IPProtocol.TCP:
if frame.tcp.src_port == Port.ARP:

View File

@@ -36,9 +36,5 @@ class Computer(Node):
"""
def __init__(self, **kwargs):
for key in {"ip_address", "subnet_mask", "default_gateway"}:
if key in kwargs:
if not isinstance(kwargs[key], IPv4Address):
kwargs[key] = IPv4Address(kwargs[key])
super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=kwargs["ip_address"], subnet_mask=kwargs["subnet_mask"]))

View File

@@ -28,17 +28,17 @@ class ACLRule(SimComponent):
:ivar ACLAction action: Action to be performed (Permit/Deny). Default is DENY.
:ivar Optional[IPProtocol] protocol: Network protocol. Default is None.
:ivar Optional[IPv4Address] src_ip: Source IP address. Default is None.
:ivar Optional[IPv4Address] src_ip_address: Source IP address. Default is None.
:ivar Optional[Port] src_port: Source port number. Default is None.
:ivar Optional[IPv4Address] dst_ip: Destination IP address. Default is None.
:ivar Optional[IPv4Address] dst_ip_address: Destination IP address. Default is None.
:ivar Optional[Port] dst_port: Destination port number. Default is None.
"""
action: ACLAction = ACLAction.DENY
protocol: Optional[IPProtocol] = None
src_ip: Optional[IPv4Address] = None
src_ip_address: Optional[IPv4Address] = None
src_port: Optional[Port] = None
dst_ip: Optional[IPv4Address] = None
dst_ip_address: Optional[IPv4Address] = None
dst_port: Optional[Port] = None
def __str__(self) -> str:
@@ -109,9 +109,9 @@ class AccessControlList(SimComponent):
self,
action: ACLAction,
protocol: Optional[IPProtocol] = None,
src_ip: Optional[Union[str, IPv4Address]] = None,
src_ip_address: Optional[Union[str, IPv4Address]] = None,
src_port: Optional[Port] = None,
dst_ip: Optional[Union[str, IPv4Address]] = None,
dst_ip_address: Optional[Union[str, IPv4Address]] = None,
dst_port: Optional[Port] = None,
position: int = 0,
) -> None:
@@ -120,20 +120,20 @@ class AccessControlList(SimComponent):
:param ACLAction action: Action to be performed (Permit/Deny).
:param Optional[IPProtocol] protocol: Network protocol.
:param Optional[Union[str, IPv4Address]] src_ip: Source IP address.
:param Optional[Union[str, IPv4Address]] src_ip_address: Source IP address.
:param Optional[Port] src_port: Source port number.
:param Optional[Union[str, IPv4Address]] dst_ip: Destination IP address.
:param Optional[Union[str, IPv4Address]] dst_ip_address: Destination IP address.
:param Optional[Port] dst_port: Destination port number.
:param int position: Position in the ACL list to insert the rule.
:raises ValueError: When the position is out of bounds.
"""
if isinstance(src_ip, str):
src_ip = IPv4Address(src_ip)
if isinstance(dst_ip, str):
dst_ip = IPv4Address(dst_ip)
if isinstance(src_ip_address, str):
src_ip_address = IPv4Address(src_ip_address)
if isinstance(dst_ip_address, str):
dst_ip_address = IPv4Address(dst_ip_address)
if 0 <= position < self.max_acl_rules:
self._acl[position] = ACLRule(
action=action, src_ip=src_ip, dst_ip=dst_ip, protocol=protocol, src_port=src_port, dst_port=dst_port
action=action, src_ip_address=src_ip_address, dst_ip_address=dst_ip_address, protocol=protocol, src_port=src_port, dst_port=dst_port
)
else:
raise ValueError(f"Position {position} is out of bounds.")
@@ -153,33 +153,33 @@ class AccessControlList(SimComponent):
def is_permitted(
self,
protocol: IPProtocol,
src_ip: Union[str, IPv4Address],
src_ip_address: Union[str, IPv4Address],
src_port: Optional[Port],
dst_ip: Union[str, IPv4Address],
dst_ip_address: Union[str, IPv4Address],
dst_port: Optional[Port],
) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
"""
Check if a packet with the given properties is permitted through the ACL.
:param protocol: The protocol of the packet.
:param src_ip: Source IP address of the packet. Accepts string and IPv4Address.
:param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address.
:param src_port: Source port of the packet. Optional.
:param dst_ip: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_port: Destination port of the packet. Optional.
:return: A tuple with a boolean indicating if the packet is permitted and an optional rule or implicit action
string.
"""
if not isinstance(src_ip, IPv4Address):
src_ip = IPv4Address(src_ip)
if not isinstance(dst_ip, IPv4Address):
dst_ip = IPv4Address(dst_ip)
if not isinstance(src_ip_address, IPv4Address):
src_ip_address = IPv4Address(src_ip_address)
if not isinstance(dst_ip_address, IPv4Address):
dst_ip_address = IPv4Address(dst_ip_address)
for rule in self._acl:
if not rule:
continue
if (
(rule.src_ip == src_ip or rule.src_ip is None)
and (rule.dst_ip == dst_ip or rule.dst_ip is None)
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
and (rule.protocol == protocol or rule.protocol is None)
and (rule.src_port == src_port or rule.src_port is None)
and (rule.dst_port == dst_port or rule.dst_port is None)
@@ -191,33 +191,33 @@ class AccessControlList(SimComponent):
def get_relevant_rules(
self,
protocol: IPProtocol,
src_ip: Union[str, IPv4Address],
src_ip_address: Union[str, IPv4Address],
src_port: Port,
dst_ip: Union[str, IPv4Address],
dst_ip_address: Union[str, IPv4Address],
dst_port: Port,
) -> List[ACLRule]:
"""
Get the list of relevant rules for a packet with given properties.
:param protocol: The protocol of the packet.
:param src_ip: Source IP address of the packet. Accepts string and IPv4Address.
:param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address.
:param src_port: Source port of the packet.
:param dst_ip: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_port: Destination port of the packet.
:return: A list of relevant ACLRules.
"""
if not isinstance(src_ip, IPv4Address):
src_ip = IPv4Address(src_ip)
if not isinstance(dst_ip, IPv4Address):
dst_ip = IPv4Address(dst_ip)
if not isinstance(src_ip_address, IPv4Address):
src_ip_address = IPv4Address(src_ip_address)
if not isinstance(dst_ip_address, IPv4Address):
dst_ip_address = IPv4Address(dst_ip_address)
relevant_rules = []
for rule in self._acl:
if rule is None:
continue
if (
(rule.src_ip == src_ip or rule.src_ip is None)
or (rule.dst_ip == dst_ip or rule.dst_ip is None)
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
or (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
or (rule.protocol == protocol or rule.protocol is None)
or (rule.src_port == src_port or rule.src_port is None)
or (rule.dst_port == dst_port or rule.dst_port is None)
@@ -244,9 +244,9 @@ class AccessControlList(SimComponent):
index,
rule.action.name if rule.action else "ANY",
rule.protocol.name if rule.protocol else "ANY",
rule.src_ip if rule.src_ip else "ANY",
rule.src_ip_address if rule.src_ip_address else "ANY",
f"{rule.src_port.value} ({rule.src_port.name})" if rule.src_port else "ANY",
rule.dst_ip if rule.dst_ip else "ANY",
rule.dst_ip_address if rule.dst_ip_address else "ANY",
f"{rule.dst_port.value} ({rule.dst_port.name})" if rule.dst_port else "ANY",
]
)
@@ -260,7 +260,7 @@ class RouteEntry(SimComponent):
Attributes:
address (IPv4Address): The destination IP address or network address.
subnet_mask (IPv4Address): The subnet mask for the network.
next_hop (IPv4Address): The next hop IP address to which packets should be forwarded.
next_hop_ip_address (IPv4Address): The next hop IP address to which packets should be forwarded.
metric (int): The cost metric for this route. Default is 0.0.
Example:
@@ -276,13 +276,13 @@ class RouteEntry(SimComponent):
"The destination IP address or network address."
subnet_mask: IPv4Address
"The subnet mask for the network."
next_hop: IPv4Address
next_hop_ip_address: IPv4Address
"The next hop IP address to which packets should be forwarded."
metric: float = 0.0
"The cost metric for this route. Default is 0.0."
def __init__(self, **kwargs):
for key in {"address", "subnet_mask", "next_hop"}:
for key in {"address", "subnet_mask", "next_hop_ip_address"}:
if not isinstance(kwargs[key], IPv4Address):
kwargs[key] = IPv4Address(kwargs[key])
super().__init__(**kwargs)
@@ -330,7 +330,7 @@ class RouteTable(SimComponent):
self,
address: Union[IPv4Address, str],
subnet_mask: Union[IPv4Address, str],
next_hop: Union[IPv4Address, str],
next_hop_ip_address: Union[IPv4Address, str],
metric: float = 0.0,
):
"""
@@ -338,13 +338,13 @@ class RouteTable(SimComponent):
:param address: The destination address of the route.
:param subnet_mask: The subnet mask of the route.
:param next_hop: The next hop IP for the route.
:param next_hop_ip_address: The next hop IP for the route.
:param metric: The metric of the route, default is 0.0.
"""
for key in {address, subnet_mask, next_hop}:
for key in {address, subnet_mask, next_hop_ip_address}:
if not isinstance(key, IPv4Address):
key = IPv4Address(key)
route = RouteEntry(address=address, subnet_mask=subnet_mask, next_hop=next_hop, metric=metric)
route = RouteEntry(address=address, subnet_mask=subnet_mask, next_hop_ip_address=next_hop_ip_address, metric=metric)
self.routes.append(route)
def find_best_route(self, destination_ip: Union[str, IPv4Address]) -> Optional[RouteEntry]:
@@ -387,7 +387,7 @@ class RouteTable(SimComponent):
table.title = f"{self.sys_log.hostname} Route Table"
for index, route in enumerate(self.routes):
network = IPv4Network(f"{route.address}/{route.subnet_mask}")
table.add_row([index, f"{route.address}/{network.prefixlen}", route.next_hop, route.metric])
table.add_row([index, f"{route.address}/{network.prefixlen}", route.next_hop_ip_address, route.metric])
print(table)
@@ -415,40 +415,40 @@ class RouterARPCache(ARPCache):
# ARP Reply
if not arp_packet.request:
for nic in self.router.nics.values():
if arp_packet.target_ip == nic.ip_address:
if arp_packet.target_ip_address == nic.ip_address:
# reply to the Router specifically
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip} "
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip,
ip_address=arp_packet.sender_ip_address,
mac_address=arp_packet.sender_mac_addr,
nic=from_nic,
)
return
# Reply for a connected requested
nic = self.get_arp_cache_nic(arp_packet.target_ip)
nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
if nic:
self.sys_log.info(f"Forwarding arp reply for {arp_packet.target_ip}, from {arp_packet.sender_ip}")
self.sys_log.info(f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}")
arp_packet.sender_mac_addr = nic.mac_address
frame.decrement_ttl()
nic.send_frame(frame)
# ARP Request
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip} "
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Matched ARP request
self.add_arp_cache_entry(ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
self.add_arp_cache_entry(ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
# If the target IP matches one of the router's NICs
for nic in self.nics.values():
if nic.enabled and nic.ip_address == arp_packet.target_ip:
if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
arp_reply = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_reply, from_nic)
return
@@ -484,17 +484,17 @@ class RouterICMP(ICMP):
# determine if request is for router interface or whether it needs to be routed
for nic in self.router.nics.values():
if nic.ip_address == frame.ip.dst_ip:
if nic.ip_address == frame.ip.dst_ip_address:
if nic.enabled:
# reply to the request
if not is_reattempt:
self.sys_log.info(f"Received echo request from {frame.ip.src_ip}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip)
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(src_ip=nic.ip_address, dst_ip=frame.ip.src_ip, protocol=IPProtocol.ICMP)
ip_packet = IPPacket(src_ip_address=nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP)
# Data Link Layer
ethernet_header = EthernetHeader(
src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address
@@ -513,7 +513,7 @@ class RouterICMP(ICMP):
icmp=icmp_reply_packet,
payload=payload,
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}")
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}")
src_nic.send_frame(frame)
return
@@ -523,12 +523,12 @@ class RouterICMP(ICMP):
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
for nic in self.router.nics.values():
if nic.ip_address == frame.ip.dst_ip:
if nic.ip_address == frame.ip.dst_ip_address:
if nic.enabled:
time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
f"Reply from {frame.ip.src_ip}: "
f"Reply from {frame.ip.src_ip_address}: "
f"bytes={len(frame.payload)}, "
f"time={time_str}, "
f"TTL={frame.ip.ttl}"
@@ -606,22 +606,22 @@ class Router(Node):
:param re_attempt: Flag to indicate if the routing is a reattempt.
"""
# Check if src ip is on network of one of the NICs
nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip)
target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip)
nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip_address)
target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip_address)
if re_attempt and not nic:
self.sys_log.info(f"Destination {frame.ip.dst_ip} is unreachable")
self.sys_log.info(f"Destination {frame.ip.dst_ip_address} is unreachable")
return
if not nic:
self.arp.send_arp_request(frame.ip.dst_ip)
self.arp.send_arp_request(frame.ip.dst_ip_address)
return self.route_frame(frame=frame, from_nic=from_nic, re_attempt=True)
if not nic.enabled:
# TODO: Add sys_log here
return
if frame.ip.dst_ip in nic.ip_network:
if frame.ip.dst_ip_address in nic.ip_network:
from_port = self._get_port_of_nic(from_nic)
to_port = self._get_port_of_nic(nic)
self.sys_log.info(f"Routing frame to internally from port {from_port} to port {to_port}")
@@ -643,8 +643,8 @@ class Router(Node):
"""
route_frame = False
protocol = frame.ip.protocol
src_ip = frame.ip.src_ip
dst_ip = frame.ip.dst_ip
src_ip_address = frame.ip.src_ip_address
dst_ip_address = frame.ip.dst_ip_address
src_port = None
dst_port = None
if frame.ip.protocol == IPProtocol.TCP:
@@ -656,14 +656,14 @@ class Router(Node):
# Check if it's permitted
permitted, rule = self.acl.is_permitted(
protocol=protocol, src_ip=src_ip, src_port=src_port, dst_ip=dst_ip, dst_port=dst_port
protocol=protocol, src_ip_address=src_ip_address, src_port=src_port, dst_ip_address=dst_ip_address, dst_port=dst_port
)
if not permitted:
at_port = self._get_port_of_nic(from_nic)
self.sys_log.info(f"Frame blocked at port {at_port} by rule {rule}")
return
if not self.arp.get_arp_cache_nic(src_ip):
self.arp.add_arp_cache_entry(src_ip, frame.ethernet.src_mac_addr, from_nic)
if not self.arp.get_arp_cache_nic(src_ip_address):
self.arp.add_arp_cache_entry(src_ip_address, frame.ethernet.src_mac_addr, from_nic)
if frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
else:

View File

@@ -24,21 +24,21 @@ class ARPPacket(BaseModel):
:param request: ARP operation. True if a request, False if a reply.
:param sender_mac_addr: Sender MAC address.
:param sender_ip: Sender IP address.
:param sender_ip_address: Sender IP address.
:param target_mac_addr: Target MAC address.
:param target_ip: Target IP address.
:param target_ip_address: Target IP address.
:Example:
>>> arp_request = ARPPacket(
... sender_mac_addr="aa:bb:cc:dd:ee:ff",
... sender_ip=IPv4Address("192.168.0.1"),
... target_ip=IPv4Address("192.168.0.2")
... sender_ip_address=IPv4Address("192.168.0.1"),
... target_ip_address=IPv4Address("192.168.0.2")
... )
>>> arp_response = ARPPacket(
... sender_mac_addr="aa:bb:cc:dd:ee:ff",
... sender_ip=IPv4Address("192.168.0.1"),
... target_ip=IPv4Address("192.168.0.2")
... sender_ip_address=IPv4Address("192.168.0.1"),
... target_ip_address=IPv4Address("192.168.0.2")
... )
"""
@@ -46,11 +46,11 @@ class ARPPacket(BaseModel):
"ARP operation. True if a request, False if a reply."
sender_mac_addr: str
"Sender MAC address."
sender_ip: IPv4Address
sender_ip_address: IPv4Address
"Sender IP address."
target_mac_addr: Optional[str] = None
"Target MAC address."
target_ip: IPv4Address
target_ip_address: IPv4Address
"Target IP address."
def generate_reply(self, mac_address: str) -> ARPPacket:
@@ -62,8 +62,8 @@ class ARPPacket(BaseModel):
"""
return ARPPacket(
request=False,
sender_ip=self.target_ip,
sender_ip_address=self.target_ip_address,
sender_mac_addr=mac_address,
target_ip=self.sender_ip,
target_ip_address=self.sender_ip_address,
target_mac_addr=self.sender_mac_addr,
)

View File

@@ -52,8 +52,8 @@ class Frame(BaseModel):
... dst_mac_addr='11:22:33:44:55:66'
... ),
... ip=IPPacket(
... src_ip=IPv4Address('192.168.0.1'),
... dst_ip=IPv4Address('10.0.0.1'),
... src_ip_address=IPv4Address('192.168.0.1'),
... dst_ip_address=IPv4Address('10.0.0.1'),
... ),
... tcp=TCPHeader(
... src_port=8080,

View File

@@ -162,8 +162,8 @@ class IPPacket(BaseModel):
"""
Represents the IP layer of a network frame.
:param src_ip: Source IP address.
:param dst_ip: Destination IP address.
:param src_ip_address: Source IP address.
:param dst_ip_address: Destination IP address.
:param protocol: The IP protocol (default is TCP).
:param ttl: Time to Live (TTL) for the packet.
:param precedence: Precedence level for Quality of Service (QoS).
@@ -172,17 +172,17 @@ class IPPacket(BaseModel):
>>> from ipaddress import IPv4Address
>>> ip_packet = IPPacket(
... src_ip=IPv4Address('192.168.0.1'),
... dst_ip=IPv4Address('10.0.0.1'),
... src_ip_address=IPv4Address('192.168.0.1'),
... dst_ip_address=IPv4Address('10.0.0.1'),
... protocol=IPProtocol.TCP,
... ttl=64,
... precedence=Precedence.CRITICAL
... )
"""
src_ip: IPv4Address
src_ip_address: IPv4Address
"Source IP address."
dst_ip: IPv4Address
dst_ip_address: IPv4Address
"Destination IP address."
protocol: IPProtocol = IPProtocol.TCP
"IPProtocol."
@@ -192,8 +192,8 @@ class IPPacket(BaseModel):
"Precedence level for Quality of Service (default is Precedence.ROUTINE)."
def __init__(self, **kwargs):
if not isinstance(kwargs["src_ip"], IPv4Address):
kwargs["src_ip"] = IPv4Address(kwargs["src_ip"])
if not isinstance(kwargs["dst_ip"], IPv4Address):
kwargs["dst_ip"] = IPv4Address(kwargs["dst_ip"])
if not isinstance(kwargs["src_ip_address"], IPv4Address):
kwargs["src_ip_address"] = IPv4Address(kwargs["src_ip_address"])
if not isinstance(kwargs["dst_ip_address"], IPv4Address):
kwargs["dst_ip_address"] = IPv4Address(kwargs["dst_ip_address"])
super().__init__(**kwargs)