Merge remote-tracking branch 'origin/dev' into feature/2257-router-routes-cannot-be-represented-in-config-file

This commit is contained in:
Czar Echavez
2024-02-29 15:21:20 +00:00
10 changed files with 49 additions and 16 deletions

View File

@@ -168,7 +168,9 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
self.enabled = True
self._connected_node.sys_log.info(f"Network Interface {self} enabled")
self.pcap = PacketCapture(hostname=self._connected_node.hostname, interface_num=self.port_num)
self.pcap = PacketCapture(
hostname=self._connected_node.hostname, port_num=self.port_num, port_name=self.port_name
)
AIR_SPACE.add_wireless_interface(self)
def disable(self):

View File

@@ -94,6 +94,9 @@ class NetworkInterface(SimComponent, ABC):
port_num: Optional[int] = None
"The port number assigned to this interface on the connected node."
port_name: Optional[str] = None
"The port name assigned to this interface on the connected node."
pcap: Optional[PacketCapture] = None
"A PacketCapture instance for capturing and analysing packets passing through this interface."
@@ -248,7 +251,7 @@ class NetworkInterface(SimComponent, ABC):
:return: A string combining the port number and the mac address
"""
return f"Port {self.port_num}: {self.mac_address}"
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}"
class WiredNetworkInterface(NetworkInterface, ABC):
@@ -293,7 +296,9 @@ class WiredNetworkInterface(NetworkInterface, ABC):
self.enabled = True
self._connected_node.sys_log.info(f"Network Interface {self} enabled")
self.pcap = PacketCapture(hostname=self._connected_node.hostname, interface_num=self.port_num)
self.pcap = PacketCapture(
hostname=self._connected_node.hostname, port_num=self.port_num, port_name=self.port_name
)
if self._connected_link:
self._connected_link.endpoint_up()
@@ -1024,7 +1029,7 @@ class Node(SimComponent):
self.sys_log.info("Resetting")
self.power_off()
def connect_nic(self, network_interface: NetworkInterface):
def connect_nic(self, network_interface: NetworkInterface, port_name: Optional[str] = None):
"""
Connect a Network Interface to the node.
@@ -1036,7 +1041,9 @@ class Node(SimComponent):
new_nic_num = len(self.network_interfaces)
self.network_interface[new_nic_num] = network_interface
network_interface._connected_node = self
network_interface._port_num_on_node = new_nic_num
network_interface.port_num = new_nic_num
if port_name:
network_interface.port_name = port_name
network_interface.parent = self
self.sys_log.info(f"Connected Network Interface {network_interface}")
if self.operating_state == NodeOperatingState.ON:

View File

@@ -83,4 +83,4 @@ class WirelessAccessPoint(IPWirelessNetworkInterface):
:return: A string combining the port number, MAC address and IP address of the NIC.
"""
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}/{self.ip_address}"

View File

@@ -80,4 +80,4 @@ class WirelessNIC(IPWirelessNetworkInterface):
:return: A string combining the port number, MAC address and IP address of the NIC.
"""
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}/{self.ip_address}"

View File

@@ -250,7 +250,7 @@ class NIC(IPWiredNetworkInterface):
:return: A string combining the port number, MAC address and IP address of the NIC.
"""
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}/{self.ip_address}"
class HostNode(Node):

View File

@@ -89,7 +89,17 @@ class Firewall(Router):
if not kwargs.get("sys_log"):
kwargs["sys_log"] = SysLog(hostname)
super().__init__(hostname=hostname, num_ports=3, **kwargs)
super().__init__(hostname=hostname, num_ports=0, **kwargs)
self.connect_nic(
RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", port_name="external")
)
self.connect_nic(
RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", port_name="internal")
)
self.connect_nic(
RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", port_name="dmz")
)
# Initialise ACLs for internal and dmz interfaces with a default DENY policy
self.internal_inbound_acl = AccessControlList(

View File

@@ -998,7 +998,7 @@ class RouterInterface(IPWiredNetworkInterface):
:return: A string combining the port number, MAC address and IP address of the NIC.
"""
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}/{self.ip_address}"
class Router(NetworkNode):

View File

@@ -80,7 +80,10 @@ class WirelessAccessPoint(IPWirelessNetworkInterface):
:return: A string combining the port number, MAC address and IP address of the NIC.
"""
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address} ({self.frequency})"
return (
f"Port {self.port_name if self.port_name else self.port_num}: "
f"{self.mac_address}/{self.ip_address} ({self.frequency})"
)
class WirelessRouter(Router):

View File

@@ -21,7 +21,13 @@ class PacketCapture:
The PCAPs are logged to: <simulation output directory>/<hostname>/<hostname>_<ip address>_pcap.log
"""
def __init__(self, hostname: str, ip_address: Optional[str] = None, interface_num: Optional[int] = None):
def __init__(
self,
hostname: str,
ip_address: Optional[str] = None,
port_num: Optional[int] = None,
port_name: Optional[str] = None,
):
"""
Initialize the PacketCapture process.
@@ -32,9 +38,12 @@ class PacketCapture:
"The hostname for which PCAP logs are being recorded."
self.ip_address: str = ip_address
"The IP address associated with the PCAP logs."
self.interface_num = interface_num
self.port_num = port_num
"The interface num on the Node."
self.port_name = port_name
"The interface name on the Node."
self.inbound_logger = None
self.outbound_logger = None
@@ -79,10 +88,12 @@ class PacketCapture:
def _get_logger_name(self, outbound: bool = False) -> str:
"""Get PCAP the logger name."""
if self.port_name:
return f"{self.hostname}_{self.port_name}_{'outbound' if outbound else 'inbound'}_pcap"
if self.ip_address:
return f"{self.hostname}_{self.ip_address}_{'outbound' if outbound else 'inbound'}_pcap"
if self.interface_num:
return f"{self.hostname}_port-{self.interface_num}_{'outbound' if outbound else 'inbound'}_pcap"
if self.port_num:
return f"{self.hostname}_port-{self.port_num}_{'outbound' if outbound else 'inbound'}_pcap"
return f"{self.hostname}_{'outbound' if outbound else 'inbound'}_pcap"
def _get_log_path(self, outbound: bool = False) -> Path: