diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index 3e68ed5f..b7dfcf72 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from typing import Callable, Dict, List, Optional, Union from uuid import uuid4 -from pydantic import BaseModel, ConfigDict, Extra, validator +from pydantic import BaseModel, ConfigDict, Extra from primaite import getLogger diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index a170506b..f2feb961 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -654,7 +654,9 @@ class ARPCache: # Data Link Layer ethernet_header = EthernetHeader(src_mac_addr=nic.mac_address, dst_mac_addr="ff:ff:ff:ff:ff:ff") arp_packet = ARPPacket( - sender_ip_address=nic.ip_address, sender_mac_addr=nic.mac_address, target_ip_address=target_ip_address + sender_ip_address=nic.ip_address, + sender_mac_addr=nic.mac_address, + target_ip_address=target_ip_address, ) frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet) nic.send_frame(frame) @@ -695,7 +697,8 @@ class ARPCache: # ARP Reply if not arp_packet.request: self.sys_log.info( - f"Received ARP response for {arp_packet.sender_ip_address} from {arp_packet.sender_mac_addr} via NIC {from_nic}" + f"Received ARP response for {arp_packet.sender_ip_address} " + f"from {arp_packet.sender_mac_addr} via NIC {from_nic}" ) self.add_arp_cache_entry( ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic @@ -714,7 +717,9 @@ class ARPCache: return # Matched ARP request - self.add_arp_cache_entry(ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic) + self.add_arp_cache_entry( + ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic + ) arp_packet = arp_packet.generate_reply(from_nic.mac_address) self.send_arp_reply(arp_packet, from_nic) @@ -759,7 +764,9 @@ class ICMP: tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) # Network Layer - ip_packet = IPPacket(src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP) + ip_packet = IPPacket( + src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP + ) # Data Link Layer ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) icmp_reply_packet = ICMPPacket( diff --git a/src/primaite/simulator/network/hardware/nodes/computer.py b/src/primaite/simulator/network/hardware/nodes/computer.py index a6def4eb..5452666b 100644 --- a/src/primaite/simulator/network/hardware/nodes/computer.py +++ b/src/primaite/simulator/network/hardware/nodes/computer.py @@ -1,5 +1,3 @@ -from ipaddress import IPv4Address - from primaite.simulator.network.hardware.base import NIC, Node diff --git a/src/primaite/simulator/network/hardware/nodes/router.py b/src/primaite/simulator/network/hardware/nodes/router.py index a8177e86..a34b83e2 100644 --- a/src/primaite/simulator/network/hardware/nodes/router.py +++ b/src/primaite/simulator/network/hardware/nodes/router.py @@ -132,7 +132,12 @@ class AccessControlList(SimComponent): dst_ip_address = IPv4Address(dst_ip_address) if 0 <= position < self.max_acl_rules: self._acl[position] = ACLRule( - action=action, src_ip_address=src_ip_address, dst_ip_address=dst_ip_address, protocol=protocol, src_port=src_port, dst_port=dst_port + action=action, + src_ip_address=src_ip_address, + dst_ip_address=dst_ip_address, + protocol=protocol, + src_port=src_port, + dst_port=dst_port, ) else: raise ValueError(f"Position {position} is out of bounds.") @@ -343,7 +348,9 @@ class RouteTable(SimComponent): for key in {address, subnet_mask, next_hop_ip_address}: if not isinstance(key, IPv4Address): key = IPv4Address(key) - route = RouteEntry(address=address, subnet_mask=subnet_mask, next_hop_ip_address=next_hop_ip_address, metric=metric) + route = RouteEntry( + address=address, subnet_mask=subnet_mask, next_hop_ip_address=next_hop_ip_address, metric=metric + ) self.routes.append(route) def find_best_route(self, destination_ip: Union[str, IPv4Address]) -> Optional[RouteEntry]: @@ -430,7 +437,9 @@ class RouterARPCache(ARPCache): # Reply for a connected requested nic = self.get_arp_cache_nic(arp_packet.target_ip_address) if nic: - self.sys_log.info(f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}") + self.sys_log.info( + f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}" + ) arp_packet.sender_mac_addr = nic.mac_address frame.decrement_ttl() nic.send_frame(frame) @@ -441,7 +450,9 @@ class RouterARPCache(ARPCache): f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} " ) # Matched ARP request - self.add_arp_cache_entry(ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic) + self.add_arp_cache_entry( + ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic + ) arp_packet = arp_packet.generate_reply(from_nic.mac_address) self.send_arp_reply(arp_packet, from_nic) @@ -493,7 +504,11 @@ class RouterICMP(ICMP): tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) # Network Layer - ip_packet = IPPacket(src_ip_address=nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP) + ip_packet = IPPacket( + src_ip_address=nic.ip_address, + dst_ip_address=frame.ip.src_ip_address, + protocol=IPProtocol.ICMP, + ) # Data Link Layer ethernet_header = EthernetHeader( src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address @@ -655,7 +670,11 @@ class Router(Node): # Check if it's permitted permitted, rule = self.acl.is_permitted( - protocol=protocol, src_ip_address=src_ip_address, src_port=src_port, dst_ip_address=dst_ip_address, dst_port=dst_port + protocol=protocol, + src_ip_address=src_ip_address, + src_port=src_port, + dst_ip_address=dst_ip_address, + dst_port=dst_port, ) if not permitted: at_port = self._get_port_of_nic(from_nic) diff --git a/src/primaite/simulator/system/core/session_manager.py b/src/primaite/simulator/system/core/session_manager.py index 705210ff..7f3d22c5 100644 --- a/src/primaite/simulator/system/core/session_manager.py +++ b/src/primaite/simulator/system/core/session_manager.py @@ -47,7 +47,13 @@ class Session(SimComponent): :return: A Session instance. """ protocol, src_ip_address, dst_ip_address, src_port, dst_port = session_key - return Session(protocol=protocol, src_ip_address=src_ip_address, dst_ip_address=dst_ip_address, src_port=src_port, dst_port=dst_port) + return Session( + protocol=protocol, + src_ip_address=src_ip_address, + dst_ip_address=dst_ip_address, + src_port=src_port, + dst_port=dst_port, + ) def describe_state(self) -> Dict: """