#1800 - Ran pre-commit

This commit is contained in:
Chris McCarthy
2023-09-04 16:34:55 +01:00
parent 5981bc7255
commit ccad5ba8a3
5 changed files with 44 additions and 14 deletions

View File

@@ -3,7 +3,7 @@ from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, Extra, validator
from pydantic import BaseModel, ConfigDict, Extra
from primaite import getLogger

View File

@@ -654,7 +654,9 @@ class ARPCache:
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=nic.mac_address, dst_mac_addr="ff:ff:ff:ff:ff:ff")
arp_packet = ARPPacket(
sender_ip_address=nic.ip_address, sender_mac_addr=nic.mac_address, target_ip_address=target_ip_address
sender_ip_address=nic.ip_address,
sender_mac_addr=nic.mac_address,
target_ip_address=target_ip_address,
)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet)
nic.send_frame(frame)
@@ -695,7 +697,8 @@ class ARPCache:
# ARP Reply
if not arp_packet.request:
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip_address} from {arp_packet.sender_mac_addr} via NIC {from_nic}"
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
@@ -714,7 +717,9 @@ class ARPCache:
return
# Matched ARP request
self.add_arp_cache_entry(ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
@@ -759,7 +764,9 @@ class ICMP:
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP)
ip_packet = IPPacket(
src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_reply_packet = ICMPPacket(

View File

@@ -1,5 +1,3 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.base import NIC, Node

View File

@@ -132,7 +132,12 @@ class AccessControlList(SimComponent):
dst_ip_address = IPv4Address(dst_ip_address)
if 0 <= position < self.max_acl_rules:
self._acl[position] = ACLRule(
action=action, src_ip_address=src_ip_address, dst_ip_address=dst_ip_address, protocol=protocol, src_port=src_port, dst_port=dst_port
action=action,
src_ip_address=src_ip_address,
dst_ip_address=dst_ip_address,
protocol=protocol,
src_port=src_port,
dst_port=dst_port,
)
else:
raise ValueError(f"Position {position} is out of bounds.")
@@ -343,7 +348,9 @@ class RouteTable(SimComponent):
for key in {address, subnet_mask, next_hop_ip_address}:
if not isinstance(key, IPv4Address):
key = IPv4Address(key)
route = RouteEntry(address=address, subnet_mask=subnet_mask, next_hop_ip_address=next_hop_ip_address, metric=metric)
route = RouteEntry(
address=address, subnet_mask=subnet_mask, next_hop_ip_address=next_hop_ip_address, metric=metric
)
self.routes.append(route)
def find_best_route(self, destination_ip: Union[str, IPv4Address]) -> Optional[RouteEntry]:
@@ -430,7 +437,9 @@ class RouterARPCache(ARPCache):
# Reply for a connected requested
nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
if nic:
self.sys_log.info(f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}")
self.sys_log.info(
f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
)
arp_packet.sender_mac_addr = nic.mac_address
frame.decrement_ttl()
nic.send_frame(frame)
@@ -441,7 +450,9 @@ class RouterARPCache(ARPCache):
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Matched ARP request
self.add_arp_cache_entry(ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
@@ -493,7 +504,11 @@ class RouterICMP(ICMP):
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(src_ip_address=nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP)
ip_packet = IPPacket(
src_ip_address=nic.ip_address,
dst_ip_address=frame.ip.src_ip_address,
protocol=IPProtocol.ICMP,
)
# Data Link Layer
ethernet_header = EthernetHeader(
src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address
@@ -655,7 +670,11 @@ class Router(Node):
# Check if it's permitted
permitted, rule = self.acl.is_permitted(
protocol=protocol, src_ip_address=src_ip_address, src_port=src_port, dst_ip_address=dst_ip_address, dst_port=dst_port
protocol=protocol,
src_ip_address=src_ip_address,
src_port=src_port,
dst_ip_address=dst_ip_address,
dst_port=dst_port,
)
if not permitted:
at_port = self._get_port_of_nic(from_nic)

View File

@@ -47,7 +47,13 @@ class Session(SimComponent):
:return: A Session instance.
"""
protocol, src_ip_address, dst_ip_address, src_port, dst_port = session_key
return Session(protocol=protocol, src_ip_address=src_ip_address, dst_ip_address=dst_ip_address, src_port=src_port, dst_port=dst_port)
return Session(
protocol=protocol,
src_ip_address=src_ip_address,
dst_ip_address=dst_ip_address,
src_port=src_port,
dst_port=dst_port,
)
def describe_state(self) -> Dict:
"""