Merge remote-tracking branch 'origin/dev' into feature/2257-router-routes-cannot-be-represented-in-config-file
This commit is contained in:
@@ -12,8 +12,8 @@ class _SimOutput:
|
||||
self._path: Path = (
|
||||
_PRIMAITE_ROOT.parent.parent / "simulation_output" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
)
|
||||
self.save_pcap_logs: bool = False
|
||||
self.save_sys_logs: bool = False
|
||||
self.save_pcap_logs: bool = True
|
||||
self.save_sys_logs: bool = True
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
|
||||
307
src/primaite/simulator/network/airspace.py
Normal file
307
src/primaite/simulator/network/airspace.py
Normal file
@@ -0,0 +1,307 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Final, List, Optional
|
||||
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.network.hardware.base import Layer3Interface, NetworkInterface, WiredNetworkInterface
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.system.core.packet_capture import PacketCapture
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
__all__ = ["AIR_SPACE", "AirSpaceFrequency", "WirelessNetworkInterface", "IPWirelessNetworkInterface"]
|
||||
|
||||
|
||||
class AirSpace:
|
||||
"""Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission."""
|
||||
|
||||
def __init__(self):
|
||||
self._wireless_interfaces: Dict[str, WirelessNetworkInterface] = {}
|
||||
self._wireless_interfaces_by_frequency: Dict[AirSpaceFrequency, List[WirelessNetworkInterface]] = {}
|
||||
|
||||
def show(self, frequency: Optional[AirSpaceFrequency] = None):
|
||||
"""
|
||||
Displays a summary of wireless interfaces in the airspace, optionally filtered by a specific frequency.
|
||||
|
||||
:param frequency: The frequency band to filter devices by. If None, devices for all frequencies are shown.
|
||||
"""
|
||||
table = PrettyTable()
|
||||
table.field_names = ["Connected Node", "MAC Address", "IP Address", "Subnet Mask", "Frequency", "Status"]
|
||||
|
||||
# If a specific frequency is provided, filter by it; otherwise, use all frequencies.
|
||||
frequencies_to_show = [frequency] if frequency else self._wireless_interfaces_by_frequency.keys()
|
||||
|
||||
for freq in frequencies_to_show:
|
||||
interfaces = self._wireless_interfaces_by_frequency.get(freq, [])
|
||||
for interface in interfaces:
|
||||
status = "Enabled" if interface.enabled else "Disabled"
|
||||
table.add_row(
|
||||
[
|
||||
interface._connected_node.hostname, # noqa
|
||||
interface.mac_address,
|
||||
interface.ip_address if hasattr(interface, "ip_address") else None,
|
||||
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
|
||||
str(freq),
|
||||
status,
|
||||
]
|
||||
)
|
||||
|
||||
print(table)
|
||||
|
||||
def add_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
|
||||
"""
|
||||
Adds a wireless network interface to the airspace if it's not already present.
|
||||
|
||||
:param wireless_interface: The wireless network interface to be added.
|
||||
"""
|
||||
if wireless_interface.mac_address not in self._wireless_interfaces:
|
||||
self._wireless_interfaces[wireless_interface.mac_address] = wireless_interface
|
||||
if wireless_interface.frequency not in self._wireless_interfaces_by_frequency:
|
||||
self._wireless_interfaces_by_frequency[wireless_interface.frequency] = []
|
||||
self._wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
|
||||
|
||||
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
|
||||
"""
|
||||
Removes a wireless network interface from the airspace if it's present.
|
||||
|
||||
:param wireless_interface: The wireless network interface to be removed.
|
||||
"""
|
||||
if wireless_interface.mac_address in self._wireless_interfaces:
|
||||
self._wireless_interfaces.pop(wireless_interface.mac_address)
|
||||
self._wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
Clears all wireless network interfaces and their frequency associations from the airspace.
|
||||
|
||||
After calling this method, the airspace will contain no wireless network interfaces, and transmissions cannot
|
||||
occur until new interfaces are added again.
|
||||
"""
|
||||
self._wireless_interfaces.clear()
|
||||
self._wireless_interfaces_by_frequency.clear()
|
||||
|
||||
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
|
||||
"""
|
||||
Transmits a frame to all enabled wireless network interfaces on a specific frequency within the airspace.
|
||||
|
||||
This ensures that a wireless interface does not receive its own transmission.
|
||||
|
||||
:param frame: The frame to be transmitted.
|
||||
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
|
||||
excluded from the list of receivers to prevent it from receiving its own transmission.
|
||||
"""
|
||||
for wireless_interface in self._wireless_interfaces_by_frequency.get(sender_network_interface.frequency, []):
|
||||
if wireless_interface != sender_network_interface and wireless_interface.enabled:
|
||||
wireless_interface.receive_frame(frame)
|
||||
|
||||
|
||||
AIR_SPACE: Final[AirSpace] = AirSpace()
|
||||
"""
|
||||
A singleton instance of the AirSpace class, representing the global wireless airspace.
|
||||
|
||||
This instance acts as the central management point for all wireless communications within the simulated network
|
||||
environment. By default, there is only one airspace in the simulation, making this variable a singleton that
|
||||
manages the registration, removal, and transmission of wireless frames across all wireless network interfaces configured
|
||||
in the simulation. It ensures that wireless frames are appropriately transmitted to and received by wireless
|
||||
interfaces based on their operational status and frequency band.
|
||||
"""
|
||||
|
||||
|
||||
class AirSpaceFrequency(Enum):
|
||||
"""Enumeration representing the operating frequencies for wireless communications."""
|
||||
|
||||
WIFI_2_4 = 2.4e9
|
||||
"""WiFi 2.4 GHz. Known for its extensive range and ability to penetrate solid objects effectively."""
|
||||
WIFI_5 = 5e9
|
||||
"""WiFi 5 GHz. Known for its higher data transmission speeds and reduced interference from other devices."""
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self == AirSpaceFrequency.WIFI_2_4:
|
||||
return "WiFi 2.4 GHz"
|
||||
elif self == AirSpaceFrequency.WIFI_5:
|
||||
return "WiFi 5 GHz"
|
||||
else:
|
||||
return "Unknown Frequency"
|
||||
|
||||
|
||||
class WirelessNetworkInterface(NetworkInterface, ABC):
|
||||
"""
|
||||
Represents a wireless network interface in a network device.
|
||||
|
||||
This abstract base class models wireless network interfaces, encapsulating properties and behaviors specific to
|
||||
wireless connectivity. It provides a framework for managing wireless connections, including signal strength,
|
||||
security protocols, and other wireless-specific attributes and methods.
|
||||
|
||||
Wireless network interfaces differ from wired ones in their medium of communication, relying on radio frequencies
|
||||
for data transmission and reception. This class serves as a base for more specific types of wireless network
|
||||
interfaces, such as Wi-Fi adapters or radio network interfaces, ensuring that essential wireless functionality is
|
||||
defined and standardised.
|
||||
|
||||
Inherits from:
|
||||
- NetworkInterface: Provides basic network interface properties and methods.
|
||||
|
||||
As an abstract base class, it requires subclasses to implement specific methods related to wireless communication
|
||||
and may define additional properties and methods specific to wireless technology.
|
||||
"""
|
||||
|
||||
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4
|
||||
|
||||
def enable(self):
|
||||
"""Attempt to enable the network interface."""
|
||||
if self.enabled:
|
||||
return
|
||||
|
||||
if not self._connected_node:
|
||||
_LOGGER.error(f"Interface {self} cannot be enabled as it is not connected to a Node")
|
||||
return
|
||||
|
||||
if self._connected_node.operating_state != NodeOperatingState.ON:
|
||||
self._connected_node.sys_log.info(
|
||||
f"Interface {self} cannot be enabled as the connected Node is not powered on"
|
||||
)
|
||||
return
|
||||
|
||||
self.enabled = True
|
||||
self._connected_node.sys_log.info(f"Network Interface {self} enabled")
|
||||
self.pcap = PacketCapture(hostname=self._connected_node.hostname, interface_num=self.port_num)
|
||||
AIR_SPACE.add_wireless_interface(self)
|
||||
|
||||
def disable(self):
|
||||
"""Disable the network interface."""
|
||||
if not self.enabled:
|
||||
return
|
||||
self.enabled = False
|
||||
if self._connected_node:
|
||||
self._connected_node.sys_log.info(f"Network Interface {self} disabled")
|
||||
else:
|
||||
_LOGGER.debug(f"Interface {self} disabled")
|
||||
AIR_SPACE.remove_wireless_interface(self)
|
||||
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame over the airspace.
|
||||
|
||||
This method sends a frame if the network interface is enabled and connected to a wireless airspace. It captures
|
||||
the frame using PCAP (if available) and transmits it through the airspace. Returns True if the frame is
|
||||
successfully sent, False otherwise (e.g., if the network interface is disabled).
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: True if the frame is sent successfully, False if the network interface is disabled.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.set_sent_timestamp()
|
||||
self.pcap.capture_outbound(frame)
|
||||
AIR_SPACE.transmit(frame, self)
|
||||
return True
|
||||
# Cannot send Frame as the network interface is not enabled
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the network interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class IPWirelessNetworkInterface(WirelessNetworkInterface, Layer3Interface, ABC):
|
||||
"""
|
||||
Represents an IP wireless network interface.
|
||||
|
||||
This interface operates at both the data link layer (Layer 2) and the network layer (Layer 3) of the OSI model,
|
||||
specifically tailored for IP-based communication over wireless connections. This abstract class provides a
|
||||
template for creating specific wireless network interfaces that support Internet Protocol (IP) functionalities.
|
||||
|
||||
As this class is a combination of its parent classes without additional attributes or methods, please refer to
|
||||
the documentation of `WirelessNetworkInterface` and `Layer3Interface` for more details on the supported operations
|
||||
and functionalities.
|
||||
|
||||
The class inherits from:
|
||||
- `WirelessNetworkInterface`: Providing the functionalities and characteristics of a wireless connection, such as
|
||||
managing wireless signal transmission, reception, and associated wireless protocols.
|
||||
- `Layer3Interface`: Enabling network layer capabilities, including IP address assignment, routing, and
|
||||
potentially, Layer 3 protocols like IPsec.
|
||||
|
||||
As an abstract class, `IPWirelessNetworkInterface` does not implement specific methods but ensures that any derived
|
||||
class provides implementations for the functionalities of both `WirelessNetworkInterface` and `Layer3Interface`.
|
||||
This setup is ideal for representing network interfaces in devices that require wireless connections and are capable
|
||||
of IP routing and addressing, such as wireless routers, access points, and wireless end-host devices like
|
||||
smartphones and laptops.
|
||||
|
||||
This class should be extended by concrete classes that define specific behaviors and properties of an IP-capable
|
||||
wireless network interface.
|
||||
"""
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
"""
|
||||
Performs post-initialisation checks to ensure the model's IP configuration is valid.
|
||||
|
||||
This method is invoked after the initialisation of a network model object to validate its network settings,
|
||||
particularly to ensure that the assigned IP address is not a network address. This validation is crucial for
|
||||
maintaining the integrity of network simulations and avoiding configuration errors that could lead to
|
||||
unrealistic or incorrect behavior.
|
||||
|
||||
:param __context: Contextual information or parameters passed to the method, used for further initializing or
|
||||
validating the model post-creation.
|
||||
:raises ValueError: If the IP address is the same as the network address, indicating an incorrect configuration.
|
||||
"""
|
||||
if self.ip_network.network_address == self.ip_address:
|
||||
raise ValueError(f"{self.ip_address}/{self.subnet_mask} must not be a network address")
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
# Get the state from the WiredNetworkInterface
|
||||
state = WiredNetworkInterface.describe_state(self)
|
||||
|
||||
# Update the state with information from Layer3Interface
|
||||
state.update(Layer3Interface.describe_state(self))
|
||||
|
||||
state["frequency"] = self.frequency
|
||||
|
||||
return state
|
||||
|
||||
def set_original_state(self):
|
||||
"""Sets the original state."""
|
||||
vals_to_include = {"ip_address", "subnet_mask", "mac_address", "speed", "mtu", "wake_on_lan", "enabled"}
|
||||
self._original_state = self.model_dump(include=vals_to_include)
|
||||
|
||||
def enable(self):
|
||||
"""
|
||||
Enables this wired network interface and attempts to send a "hello" message to the default gateway.
|
||||
|
||||
This method activates the network interface, making it operational for network communications. After enabling,
|
||||
it tries to initiate a default gateway "hello" process, typically to establish initial connectivity and resolve
|
||||
the default gateway's MAC address. This step is crucial for ensuring the interface can successfully send data
|
||||
to and receive data from the network.
|
||||
|
||||
The method safely handles cases where the connected node might not have a default gateway set or the
|
||||
`default_gateway_hello` method is not defined, ignoring such errors to proceed without interruption.
|
||||
"""
|
||||
super().enable()
|
||||
try:
|
||||
self._connected_node.default_gateway_hello()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
@@ -420,86 +420,6 @@ class IPWiredNetworkInterface(WiredNetworkInterface, Layer3Interface, ABC):
|
||||
pass
|
||||
|
||||
|
||||
class WirelessNetworkInterface(NetworkInterface, ABC):
|
||||
"""
|
||||
Represents a wireless network interface in a network device.
|
||||
|
||||
This abstract base class models wireless network interfaces, encapsulating properties and behaviors specific to
|
||||
wireless connectivity. It provides a framework for managing wireless connections, including signal strength,
|
||||
security protocols, and other wireless-specific attributes and methods.
|
||||
|
||||
Wireless network interfaces differ from wired ones in their medium of communication, relying on radio frequencies
|
||||
for data transmission and reception. This class serves as a base for more specific types of wireless interfaces,
|
||||
such as Wi-Fi adapters or radio network interfaces, ensuring that essential wireless functionality is defined
|
||||
and standardised.
|
||||
|
||||
Inherits from:
|
||||
- NetworkInterface: Provides basic network interface properties and methods.
|
||||
|
||||
As an abstract base class, it requires subclasses to implement specific methods related to wireless communication
|
||||
and may define additional properties and methods specific to wireless technology.
|
||||
"""
|
||||
|
||||
|
||||
class IPWirelessNetworkInterface(WiredNetworkInterface, Layer3Interface, ABC):
|
||||
"""
|
||||
Represents an IP wireless network interface.
|
||||
|
||||
This interface operates at both the data link layer (Layer 2) and the network layer (Layer 3) of the OSI model,
|
||||
specifically tailored for IP-based communication over wireless connections. This abstract class provides a
|
||||
template for creating specific wireless network interfaces that support Internet Protocol (IP) functionalities.
|
||||
|
||||
As this class is a combination of its parent classes without additional attributes or methods, please refer to
|
||||
the documentation of `WirelessNetworkInterface` and `Layer3Interface` for more details on the supported operations
|
||||
and functionalities.
|
||||
|
||||
The class inherits from:
|
||||
- `WirelessNetworkInterface`: Providing the functionalities and characteristics of a wireless connection, such as
|
||||
managing wireless signal transmission, reception, and associated wireless protocols.
|
||||
- `Layer3Interface`: Enabling network layer capabilities, including IP address assignment, routing, and
|
||||
potentially, Layer 3 protocols like IPsec.
|
||||
|
||||
As an abstract class, `IPWirelessNetworkInterface` does not implement specific methods but ensures that any derived
|
||||
class provides implementations for the functionalities of both `WirelessNetworkInterface` and `Layer3Interface`.
|
||||
This setup is ideal for representing network interfaces in devices that require wireless connections and are capable
|
||||
of IP routing and addressing, such as wireless routers, access points, and wireless end-host devices like
|
||||
smartphones and laptops.
|
||||
|
||||
This class should be extended by concrete classes that define specific behaviors and properties of an IP-capable
|
||||
wireless network interface.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def enable(self):
|
||||
"""Enable the interface."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def disable(self):
|
||||
"""Disable the interface."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame through the interface.
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: A boolean indicating whether the frame was successfully sent.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class Link(SimComponent):
|
||||
"""
|
||||
Represents a network link between NIC<-->NIC, NIC<-->SwitchPort, or SwitchPort<-->SwitchPort.
|
||||
|
||||
@@ -0,0 +1,493 @@
|
||||
from typing import Dict, Final, Optional, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.network.hardware.nodes.network.router import (
|
||||
AccessControlList,
|
||||
ACLAction,
|
||||
Router,
|
||||
RouterInterface,
|
||||
)
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
EXTERNAL_PORT_ID: Final[int] = 1
|
||||
"""The Firewall port ID of the external port."""
|
||||
INTERNAL_PORT_ID: Final[int] = 2
|
||||
"""The Firewall port ID of the internal port."""
|
||||
DMZ_PORT_ID: Final[int] = 3
|
||||
"""The Firewall port ID of the DMZ port."""
|
||||
|
||||
|
||||
class Firewall(Router):
|
||||
"""
|
||||
A Firewall class that extends the functionality of a Router.
|
||||
|
||||
The Firewall class acts as a network security system that monitors and controls incoming and outgoing
|
||||
network traffic based on predetermined security rules. It is an intermediary between internal and external
|
||||
networks (including DMZ - De-Militarized Zone), ensuring that all inbound and outbound traffic complies with
|
||||
the security policies.
|
||||
|
||||
The Firewall employs Access Control Lists (ACLs) to filter traffic. Both the internal and DMZ ports have both
|
||||
inbound and outbound ACLs that determine what traffic is allowed to pass.
|
||||
|
||||
In addition to the security functions, the Firewall can also perform some routing functions similar to a Router,
|
||||
forwarding packets between its interfaces based on the destination IP address.
|
||||
|
||||
Usage:
|
||||
To utilise the Firewall class, instantiate it with a hostname and optionally specify sys_log for logging.
|
||||
Configure the internal, external, and DMZ ports with IP addresses and subnet masks. Define ACL rules to
|
||||
permit or deny traffic based on your security policies. The Firewall will process frames based on these
|
||||
rules, determining whether to allow or block traffic at each network interface.
|
||||
|
||||
Example:
|
||||
>>> from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
>>> from primaite.simulator.network.transmission.transport_layer import Port
|
||||
>>> firewall = Firewall(hostname="Firewall1")
|
||||
>>> firewall.configure_internal_port(ip_address="192.168.1.1", subnet_mask="255.255.255.0")
|
||||
>>> firewall.configure_external_port(ip_address="10.0.0.1", subnet_mask="255.255.255.0")
|
||||
>>> firewall.configure_dmz_port(ip_address="172.16.0.1", subnet_mask="255.255.255.0")
|
||||
>>> # Permit HTTP traffic to the DMZ
|
||||
>>> firewall.dmz_inbound_acl.add_rule(
|
||||
... action=ACLAction.PERMIT,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... dst_port=Port.HTTP,
|
||||
... src_ip_address="0.0.0.0",
|
||||
... src_wildcard_mask="0.0.0.0",
|
||||
... dst_ip_address="172.16.0.0",
|
||||
... dst_wildcard_mask="0.0.0.255"
|
||||
... )
|
||||
|
||||
:ivar str hostname: The Firewall hostname.
|
||||
"""
|
||||
|
||||
internal_inbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing entering the internal network."""
|
||||
|
||||
internal_outbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic leaving the internal network."""
|
||||
|
||||
dmz_inbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic entering the DMZ."""
|
||||
|
||||
dmz_outbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic leaving the DMZ."""
|
||||
|
||||
external_inbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic entering from an external network."""
|
||||
|
||||
external_outbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic leaving towards an external network."""
|
||||
|
||||
def __init__(self, hostname: str, **kwargs):
|
||||
if not kwargs.get("sys_log"):
|
||||
kwargs["sys_log"] = SysLog(hostname)
|
||||
|
||||
super().__init__(hostname=hostname, num_ports=3, **kwargs)
|
||||
|
||||
# Initialise ACLs for internal and dmz interfaces with a default DENY policy
|
||||
self.internal_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Inbound"
|
||||
)
|
||||
self.internal_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Outbound"
|
||||
)
|
||||
self.dmz_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Inbound"
|
||||
)
|
||||
self.dmz_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Outbound"
|
||||
)
|
||||
|
||||
# external ACLs should have a default PERMIT policy
|
||||
self.external_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Inbound"
|
||||
)
|
||||
self.external_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Outbound"
|
||||
)
|
||||
|
||||
self.set_original_state()
|
||||
|
||||
def set_original_state(self):
|
||||
"""Set the original state for the Firewall."""
|
||||
super().set_original_state()
|
||||
vals_to_include = {
|
||||
"internal_port",
|
||||
"external_port",
|
||||
"dmz_port",
|
||||
"internal_inbound_acl",
|
||||
"internal_outbound_acl",
|
||||
"dmz_inbound_acl",
|
||||
"dmz_outbound_acl",
|
||||
"external_inbound_acl",
|
||||
"external_outbound_acl",
|
||||
}
|
||||
self._original_state.update(self.model_dump(include=vals_to_include))
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Describes the current state of the Firewall.
|
||||
|
||||
:return: A dictionary representing the current state.
|
||||
"""
|
||||
state = super().describe_state()
|
||||
|
||||
state.update(
|
||||
{
|
||||
"internal_port": self.internal_port.describe_state(),
|
||||
"external_port": self.external_port.describe_state(),
|
||||
"dmz_port": self.dmz_port.describe_state(),
|
||||
"internal_inbound_acl": self.internal_inbound_acl.describe_state(),
|
||||
"internal_outbound_acl": self.internal_outbound_acl.describe_state(),
|
||||
"dmz_inbound_acl": self.dmz_inbound_acl.describe_state(),
|
||||
"dmz_outbound_acl": self.dmz_outbound_acl.describe_state(),
|
||||
"external_inbound_acl": self.external_inbound_acl.describe_state(),
|
||||
"external_outbound_acl": self.external_outbound_acl.describe_state(),
|
||||
}
|
||||
)
|
||||
|
||||
return state
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
Displays the current configuration of the firewall's network interfaces in a table format.
|
||||
|
||||
The table includes information about each port (External, Internal, DMZ), their MAC addresses, IP
|
||||
configurations, link speeds, and operational status. The output can be formatted as Markdown if specified.
|
||||
|
||||
:param markdown: If True, formats the output table in Markdown style. Useful for documentation or reporting
|
||||
purposes within Markdown-compatible platforms.
|
||||
"""
|
||||
table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.hostname} Network Interfaces"
|
||||
ports = {"External": self.external_port, "Internal": self.internal_port, "DMZ": self.dmz_port}
|
||||
for port, network_interface in ports.items():
|
||||
table.add_row(
|
||||
[
|
||||
port,
|
||||
network_interface.mac_address,
|
||||
f"{network_interface.ip_address}/{network_interface.ip_network.prefixlen}",
|
||||
network_interface.speed,
|
||||
"Enabled" if network_interface.enabled else "Disabled",
|
||||
]
|
||||
)
|
||||
print(table)
|
||||
|
||||
def show_rules(self, external: bool = True, internal: bool = True, dmz: bool = True, markdown: bool = False):
|
||||
"""
|
||||
Prints the configured ACL rules for each specified network zone of the firewall.
|
||||
|
||||
This method allows selective viewing of ACL rules applied to external, internal, and DMZ interfaces, providing
|
||||
a clear overview of the firewall's current traffic filtering policies. Each section can be independently
|
||||
toggled.
|
||||
|
||||
:param external: If True, shows ACL rules for external interfaces.
|
||||
:param internal: If True, shows ACL rules for internal interfaces.
|
||||
:param dmz: If True, shows ACL rules for DMZ interfaces.
|
||||
:param markdown: If True, formats the output in Markdown, enhancing readability in Markdown-compatible viewers.
|
||||
"""
|
||||
print(f"{self.hostname} Firewall Rules")
|
||||
print()
|
||||
if external:
|
||||
self.external_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.external_outbound_acl.show(markdown)
|
||||
print()
|
||||
if internal:
|
||||
self.internal_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.internal_outbound_acl.show(markdown)
|
||||
print()
|
||||
if dmz:
|
||||
self.dmz_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.dmz_outbound_acl.show(markdown)
|
||||
print()
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
Receive a frame and process it.
|
||||
|
||||
Acts as the primary entry point for all network frames arriving at the Firewall, determining the flow of
|
||||
traffic based on the source network interface controller (NIC) and applying the appropriate Access Control
|
||||
List (ACL) rules.
|
||||
|
||||
This method categorizes the incoming traffic into three main pathways based on the source NIC: external inbound,
|
||||
internal outbound, and DMZ (De-Militarized Zone) outbound. It plays a crucial role in enforcing the firewall's
|
||||
security policies by directing each frame to the corresponding processing method that evaluates it against
|
||||
specific ACL rules.
|
||||
|
||||
Based on the originating NIC:
|
||||
- Frames from the external port are processed as external inbound traffic, potentially destined for either the
|
||||
DMZ or the internal network.
|
||||
- Frames from the internal port are treated as internal outbound traffic, aimed at reaching the external
|
||||
network or a service within the DMZ.
|
||||
- Frames from the DMZ port are handled as DMZ outbound traffic, with potential destinations including the
|
||||
internal network or the external network.
|
||||
|
||||
:param frame: The network frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming. Used to
|
||||
determine the direction of the traffic (inbound or outbound) and the zone (external, internal,
|
||||
DMZ) it belongs to.
|
||||
"""
|
||||
# If the frame comes from the external port, it's considered as external inbound traffic
|
||||
if from_network_interface == self.external_port:
|
||||
self._process_external_inbound_frame(frame, from_network_interface)
|
||||
return
|
||||
# If the frame comes from the internal port, it's considered as internal outbound traffic
|
||||
elif from_network_interface == self.internal_port:
|
||||
self._process_internal_outbound_frame(frame, from_network_interface)
|
||||
return
|
||||
# If the frame comes from the DMZ port, it's considered as DMZ outbound traffic
|
||||
elif from_network_interface == self.dmz_port:
|
||||
self._process_dmz_outbound_frame(frame, from_network_interface)
|
||||
return
|
||||
|
||||
def _process_external_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames arriving from the external network.
|
||||
|
||||
Determines the path for frames based on their destination IP addresses and ACL rules for the external inbound
|
||||
interface. Frames destined for the DMZ or internal network are forwarded accordingly, if allowed by the ACL.
|
||||
|
||||
If a frame is permitted by the ACL, it is either passed to the session manager (if applicable) or forwarded to
|
||||
the appropriate network zone (DMZ/internal). Denied frames are logged and dropped.
|
||||
|
||||
:param frame: The frame to be processed, containing network layer and transport layer information.
|
||||
:param from_network_interface: The interface on the firewall through which the frame was received.
|
||||
"""
|
||||
# check if External Inbound ACL Rules permit frame
|
||||
permitted, rule = self.external_inbound_acl.is_permitted(frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# If the destination IP is within the DMZ network, process the frame as DMZ inbound
|
||||
if frame.ip.dst_ip_address in self.dmz_port.ip_network:
|
||||
self._process_dmz_inbound_frame(frame, from_network_interface)
|
||||
else:
|
||||
# Otherwise, process the frame as internal inbound
|
||||
self._process_internal_inbound_frame(frame, from_network_interface)
|
||||
|
||||
def _process_external_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are outbound towards the external network.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
# check if External Outbound ACL Rules permit frame
|
||||
permitted, rule = self.external_outbound_acl.is_permitted(frame=frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
|
||||
self.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
def _process_internal_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are inbound towards the internal LAN.
|
||||
|
||||
This method is responsible for handling frames coming from either the external network or the DMZ towards
|
||||
the internal LAN. It checks the frames against the internal inbound ACL to decide whether to allow or deny
|
||||
the traffic, and take appropriate actions.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
# check if Internal Inbound ACL Rules permit frame
|
||||
permitted, rule = self.internal_inbound_acl.is_permitted(frame=frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
|
||||
self.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
def _process_internal_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are outbound from the internal network.
|
||||
|
||||
This method handles frames that are leaving the internal network. Depending on the destination IP address,
|
||||
the frame may be forwarded to the DMZ or to the external network.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
permitted, rule = self.internal_outbound_acl.is_permitted(frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# If the destination IP is within the DMZ network, process the frame as DMZ inbound
|
||||
if frame.ip.dst_ip_address in self.dmz_port.ip_network:
|
||||
self._process_dmz_inbound_frame(frame, from_network_interface)
|
||||
else:
|
||||
# If the destination IP is not within the DMZ network, process the frame as external outbound
|
||||
self._process_external_outbound_frame(frame, from_network_interface)
|
||||
|
||||
def _process_dmz_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are inbound from the DMZ.
|
||||
|
||||
This method is responsible for handling frames coming from either the external network or the internal LAN
|
||||
towards the DMZ. It checks the frames against the DMZ inbound ACL to decide whether to allow or deny the
|
||||
traffic, and take appropriate actions.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
# check if DMZ Inbound ACL Rules permit frame
|
||||
permitted, rule = self.dmz_inbound_acl.is_permitted(frame=frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
|
||||
self.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
def _process_dmz_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are outbound from the DMZ.
|
||||
|
||||
This method handles frames originating from the DMZ and determines their appropriate path based on the
|
||||
destination IP address. It involves checking the DMZ outbound ACL, consulting the ARP cache and the routing
|
||||
table to find the correct outbound NIC, and then forwarding the frame to either the internal network or the
|
||||
external network.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
permitted, rule = self.dmz_outbound_acl.is_permitted(frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# Attempt to get the outbound NIC from the ARP cache using the destination IP address
|
||||
outbound_nic = self.software_manager.arp.get_arp_cache_network_interface(frame.ip.dst_ip_address)
|
||||
|
||||
# If outbound NIC is not found in the ARP cache, consult the routing table to find the best route
|
||||
if not outbound_nic:
|
||||
route = self.route_table.find_best_route(frame.ip.dst_ip_address)
|
||||
if route:
|
||||
# If a route is found, get the corresponding outbound NIC from the ARP cache using the next-hop IP
|
||||
# address
|
||||
outbound_nic = self.software_manager.arp.get_arp_cache_network_interface(route.next_hop_ip_address)
|
||||
|
||||
# If an outbound NIC is determined
|
||||
if outbound_nic:
|
||||
if outbound_nic == self.external_port:
|
||||
# If the outbound NIC is the external port, check the frame against the DMZ outbound ACL and
|
||||
# process it as an external outbound frame
|
||||
self._process_external_outbound_frame(frame, from_network_interface)
|
||||
return
|
||||
elif outbound_nic == self.internal_port:
|
||||
# If the outbound NIC is the internal port, check the frame against the DMZ outbound ACL and
|
||||
# process it as an internal inbound frame
|
||||
self._process_internal_inbound_frame(frame, from_network_interface)
|
||||
return
|
||||
# TODO: What to do here? Destination unreachable? Send ICMP back?
|
||||
return
|
||||
|
||||
@property
|
||||
def external_port(self) -> RouterInterface:
|
||||
"""
|
||||
The external port of the firewall.
|
||||
|
||||
:return: The external port connecting the firewall to the external network.
|
||||
"""
|
||||
return self.network_interface[EXTERNAL_PORT_ID]
|
||||
|
||||
@validate_call()
|
||||
def configure_external_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""
|
||||
Configure the external port with an IP address and a subnet mask.
|
||||
|
||||
Enables the port once configured.
|
||||
|
||||
:param ip_address: The IP address to assign to the external port.
|
||||
:param subnet_mask: The subnet mask to assign to the external port.
|
||||
"""
|
||||
# Configure the external port with the specified IP address and subnet mask
|
||||
self.configure_port(EXTERNAL_PORT_ID, ip_address, subnet_mask)
|
||||
self.external_port.enable()
|
||||
|
||||
@property
|
||||
def internal_port(self) -> RouterInterface:
|
||||
"""
|
||||
The internal port of the firewall.
|
||||
|
||||
:return: The external port connecting the firewall to the internal LAN.
|
||||
"""
|
||||
return self.network_interface[INTERNAL_PORT_ID]
|
||||
|
||||
@validate_call()
|
||||
def configure_internal_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""
|
||||
Configure the internal port with an IP address and a subnet mask.
|
||||
|
||||
Enables the port once configured.
|
||||
|
||||
:param ip_address: The IP address to assign to the internal port.
|
||||
:param subnet_mask: The subnet mask to assign to the internal port.
|
||||
"""
|
||||
self.configure_port(INTERNAL_PORT_ID, ip_address, subnet_mask)
|
||||
self.internal_port.enable()
|
||||
|
||||
@property
|
||||
def dmz_port(self) -> RouterInterface:
|
||||
"""
|
||||
The DMZ port of the firewall.
|
||||
|
||||
:return: The external port connecting the firewall to the DMZ.
|
||||
"""
|
||||
return self.network_interface[DMZ_PORT_ID]
|
||||
|
||||
@validate_call()
|
||||
def configure_dmz_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""
|
||||
Configure the DMZ port with an IP address and a subnet mask.
|
||||
|
||||
Enables the port once configured.
|
||||
|
||||
:param ip_address: The IP address to assign to the DMZ port.
|
||||
:param subnet_mask: The subnet mask to assign to the DMZ port.
|
||||
"""
|
||||
self.configure_port(DMZ_PORT_ID, ip_address, subnet_mask)
|
||||
self.dmz_port.enable()
|
||||
@@ -6,6 +6,7 @@ from ipaddress import IPv4Address, IPv4Network
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.core import RequestManager, RequestType, SimComponent
|
||||
from primaite.simulator.network.hardware.base import IPWiredNetworkInterface
|
||||
@@ -19,6 +20,43 @@ from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.services.arp.arp import ARP
|
||||
from primaite.simulator.system.services.icmp.icmp import ICMP
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
@validate_call()
|
||||
def ip_matches_masked_range(ip_to_check: IPV4Address, base_ip: IPV4Address, wildcard_mask: IPV4Address) -> bool:
|
||||
"""
|
||||
Determine if a given IP address matches a range defined by a base IP address and a wildcard mask.
|
||||
|
||||
The wildcard mask specifies which bits in the IP address should be ignored (1) and which bits must match (0).
|
||||
|
||||
The function applies the wildcard mask to both the base IP and the IP address to check by first negating the
|
||||
wildcard mask and then performing a bitwise AND operation. This process effectively masks out the bits indicated
|
||||
by the wildcard mask. If the resulting masked IP addresses are equal, it means the IP address to check falls within
|
||||
the range defined by the base IP and wildcard mask.
|
||||
|
||||
:param IPV4Address ip_to_check: The IP address to be checked.
|
||||
:param IPV4Address base_ip: The base IP address defining the start of the range.
|
||||
:param IPV4Address wildcard_mask: The wildcard mask specifying which bits to ignore.
|
||||
:return: A boolean value indicating whether the IP address matches the masked range.
|
||||
:rtype: bool
|
||||
|
||||
Example usage:
|
||||
>>> ip_matches_masked_range(ip_to_check="192.168.10.10", base_ip="192.168.1.1", wildcard_mask="0.0.255.255")
|
||||
False
|
||||
"""
|
||||
# Convert the IP addresses from IPv4Address objects to integer representations for bitwise operations
|
||||
base_ip_int = int(base_ip)
|
||||
ip_to_check_int = int(ip_to_check)
|
||||
wildcard_int = int(wildcard_mask)
|
||||
|
||||
# Negate the wildcard mask and apply it to both the base IP and the IP to check using bitwise AND
|
||||
# This step masks out the bits to be ignored according to the wildcard mask
|
||||
masked_base_ip = base_ip_int & ~wildcard_int
|
||||
masked_ip_to_check = ip_to_check_int & ~wildcard_int
|
||||
|
||||
# Compare the masked IP addresses to determine if they match within the masked range
|
||||
return masked_base_ip == masked_ip_to_check
|
||||
|
||||
|
||||
class ACLAction(Enum):
|
||||
@@ -30,22 +68,62 @@ class ACLAction(Enum):
|
||||
|
||||
class ACLRule(SimComponent):
|
||||
"""
|
||||
Represents an Access Control List (ACL) rule.
|
||||
Represents an Access Control List (ACL) rule within a network device.
|
||||
|
||||
:ivar ACLAction action: Action to be performed (Permit/Deny). Default is DENY.
|
||||
:ivar Optional[IPProtocol] protocol: Network protocol. Default is None.
|
||||
:ivar Optional[IPv4Address] src_ip_address: Source IP address. Default is None.
|
||||
:ivar Optional[Port] src_port: Source port number. Default is None.
|
||||
:ivar Optional[IPv4Address] dst_ip_address: Destination IP address. Default is None.
|
||||
:ivar Optional[Port] dst_port: Destination port number. Default is None.
|
||||
Enables fine-grained control over network traffic based on specified criteria such as IP addresses, protocols,
|
||||
and ports. ACL rules can be configured to permit or deny traffic, providing a powerful mechanism for enforcing
|
||||
security policies and traffic flow.
|
||||
|
||||
ACL rules support specifying exact match conditions, ranges of IP addresses using wildcard masks, and
|
||||
protocol types. This flexibility allows for complex traffic filtering scenarios, from blocking or allowing
|
||||
specific types of traffic to entire subnets.
|
||||
|
||||
**Usage:**
|
||||
|
||||
- **Dedicated IP Addresses**: To match traffic from or to a specific IP address, set the `src_ip_address`
|
||||
and/or `dst_ip_address` without a wildcard mask. This is useful for rules that apply to individual hosts.
|
||||
|
||||
- **IP Ranges with Wildcard Masks**: For rules that apply to a range of IP addresses, use the `src_wildcard_mask`
|
||||
and/or `dst_wildcard_mask` in conjunction with the base IP address. Wildcard masks are a way to specify which
|
||||
bits of the IP address should be matched exactly and which bits can vary. For example, a wildcard mask of
|
||||
`0.0.0.255` applied to a base address of `192.168.1.0` allows for any address from `192.168.1.0` to
|
||||
`192.168.1.255`.
|
||||
|
||||
- **Allowing All IP Traffic**: To mimic the Cisco ACL rule that permits all IP traffic from a specific range,
|
||||
you may use wildcard masks with the rule action set to `PERMIT`. If your implementation includes an `ALL`
|
||||
option in the `IPProtocol` enum, use it to allow all protocols; otherwise, consider the rule without a
|
||||
specified protocol to apply to all IP traffic.
|
||||
|
||||
|
||||
The combination of these attributes allows for the creation of granular rules to control traffic flow
|
||||
effectively, enhancing network security and management.
|
||||
|
||||
|
||||
:ivar ACLAction action: Specifies whether to `PERMIT` or `DENY` the traffic that matches the rule conditions.
|
||||
The default action is `DENY`.
|
||||
:ivar Optional[IPProtocol] protocol: The network protocol (e.g., TCP, UDP, ICMP) to match. If `None`, the rule
|
||||
applies to all protocols.
|
||||
:ivar Optional[IPV4Address] src_ip_address: The source IP address to match. If combined with `src_wildcard_mask`,
|
||||
it specifies the start of an IP range.
|
||||
:ivar Optional[IPV4Address] src_wildcard_mask: The wildcard mask for the source IP address, defining the range
|
||||
of addresses to match.
|
||||
:ivar Optional[IPV4Address] dst_ip_address: The destination IP address to match. If combined with
|
||||
`dst_wildcard_mask`, it specifies the start of an IP range.
|
||||
:ivar Optional[IPv4Address] dst_wildcard_mask: The wildcard mask for the destination IP address, defining the
|
||||
range of addresses to match.
|
||||
:ivar Optional[Port] src_port: The source port number to match. Relevant for TCP/UDP protocols.
|
||||
:ivar Optional[Port] dst_port: The destination port number to match. Relevant for TCP/UDP protocols.
|
||||
"""
|
||||
|
||||
action: ACLAction = ACLAction.DENY
|
||||
protocol: Optional[IPProtocol] = None
|
||||
src_ip_address: Optional[IPv4Address] = None
|
||||
src_ip_address: Optional[IPV4Address] = None
|
||||
src_wildcard_mask: Optional[IPV4Address] = None
|
||||
dst_ip_address: Optional[IPV4Address] = None
|
||||
dst_wildcard_mask: Optional[IPV4Address] = None
|
||||
src_port: Optional[Port] = None
|
||||
dst_ip_address: Optional[IPv4Address] = None
|
||||
dst_port: Optional[Port] = None
|
||||
match_count: int = 0
|
||||
|
||||
def __str__(self) -> str:
|
||||
rule_strings = []
|
||||
@@ -76,24 +154,136 @@ class ACLRule(SimComponent):
|
||||
state["src_port"] = self.src_port.name if self.src_port else None
|
||||
state["dst_ip_address"] = str(self.dst_ip_address) if self.dst_ip_address else None
|
||||
state["dst_port"] = self.dst_port.name if self.dst_port else None
|
||||
state["match_count"] = self.match_count
|
||||
return state
|
||||
|
||||
def permit_frame_check(self, frame: Frame) -> Tuple[bool, bool]:
|
||||
"""
|
||||
Evaluates whether a given network frame should be permitted or denied based on this ACL rule.
|
||||
|
||||
This method checks the frame against the ACL rule's criteria, including protocol, source and destination IP
|
||||
addresses (with support for wildcard masking), and source and destination ports. The method assumes that an
|
||||
unspecified (None) criterion implies a match for any value in that category. For IP addresses, wildcard masking
|
||||
can be used to specify ranges of addresses that match the rule.
|
||||
|
||||
The method follows these steps to determine if a frame is permitted:
|
||||
|
||||
1. Check if the frame's protocol matches the ACL rule's protocol.
|
||||
2. For source and destination IP addresses:
|
||||
1. If a wildcard mask is defined, check if the frame's IP address is within the range specified by the base
|
||||
IP address and the wildcard mask.
|
||||
2. If no wildcard mask is defined, directly compare the frame's IP address to the one specified in the rule.
|
||||
3. Check if the frame's source and destination ports match those specified in the rule.
|
||||
4. The frame is permitted if it matches all specified criteria and the rule's action is PERMIT. Conversely, it
|
||||
is not permitted if any criterion does not match or if the rule's action is DENY.
|
||||
|
||||
:param frame: The network frame to be evaluated.
|
||||
:return: A tuple containing two boolean values: The first indicates if the frame is permitted by this rule (
|
||||
True if permitted, otherwise False). The second indicates if the frame matches the rule's criteria (True
|
||||
if it matches, otherwise False).
|
||||
"""
|
||||
permitted = False
|
||||
frame_matches_rule = False
|
||||
protocol_matches = self.protocol == frame.ip.protocol if self.protocol else True
|
||||
|
||||
src_ip_matches = self.src_ip_address is None # Assume match if no specific src IP is defined
|
||||
if self.src_ip_address:
|
||||
if self.src_wildcard_mask:
|
||||
# If a src wildcard mask is provided, use it to check the range
|
||||
src_ip_matches = ip_matches_masked_range(
|
||||
ip_to_check=frame.ip.src_ip_address,
|
||||
base_ip=self.src_ip_address,
|
||||
wildcard_mask=self.src_wildcard_mask,
|
||||
)
|
||||
else:
|
||||
# Direct comparison if no wildcard mask is defined
|
||||
src_ip_matches = frame.ip.src_ip_address == self.src_ip_address
|
||||
|
||||
dst_ip_matches = self.dst_ip_address is None # Assume match if no specific dst IP is defined
|
||||
if self.dst_ip_address:
|
||||
if self.dst_wildcard_mask:
|
||||
# If a dst wildcard mask is provided, use it to check the range
|
||||
dst_ip_matches = ip_matches_masked_range(
|
||||
ip_to_check=frame.ip.dst_ip_address,
|
||||
base_ip=self.dst_ip_address,
|
||||
wildcard_mask=self.dst_wildcard_mask,
|
||||
)
|
||||
else:
|
||||
# Direct comparison if no wildcard mask is defined
|
||||
dst_ip_matches = frame.ip.dst_ip_address == self.dst_ip_address
|
||||
|
||||
src_port = None
|
||||
dst_port = None
|
||||
if frame.tcp:
|
||||
src_port = frame.tcp.src_port
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.udp:
|
||||
src_port = frame.udp.src_port
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
src_port_matches = self.src_port == src_port if self.src_port else True
|
||||
dst_port_matches = self.dst_port == dst_port if self.dst_port else True
|
||||
|
||||
# The frame is permitted if all conditions are met
|
||||
if protocol_matches and src_ip_matches and dst_ip_matches and src_port_matches and dst_port_matches:
|
||||
frame_matches_rule = True
|
||||
permitted = self.action == ACLAction.PERMIT
|
||||
|
||||
return permitted, frame_matches_rule
|
||||
|
||||
|
||||
class AccessControlList(SimComponent):
|
||||
"""
|
||||
Manages a list of ACLRules to filter network traffic.
|
||||
|
||||
:ivar SysLog sys_log: System logging instance.
|
||||
:ivar ACLAction implicit_action: Default action for rules.
|
||||
:ivar ACLRule implicit_rule: Implicit ACL rule, created based on implicit_action.
|
||||
:ivar int max_acl_rules: Maximum number of ACL rules that can be added. Default is 25.
|
||||
:ivar List[Optional[ACLRule]] _acl: A list containing the ACL rules.
|
||||
Manages a list of ACLRule instances to filter network traffic based on predefined criteria. This class
|
||||
provides functionalities to add, remove, and evaluate ACL rules, thereby controlling the flow of traffic
|
||||
through a network device.
|
||||
|
||||
ACL rules can specify conditions based on source and destination IP addresses, IP protocols (TCP, UDP, ICMP),
|
||||
and port numbers. Rules can be configured to permit or deny traffic that matches these conditions, offering
|
||||
granular control over network security policies.
|
||||
|
||||
Usage:
|
||||
- **Dedicated IP Addresses**: Directly specify the source and/or destination IP addresses in an ACL rule to
|
||||
match traffic to or from specific hosts.
|
||||
- **IP Ranges with Wildcard Masks**: Use wildcard masks along with base IP addresses to define ranges of IP
|
||||
addresses that an ACL rule applies to. This is useful for specifying subnets or ranges of IP addresses.
|
||||
- **Allowing All IP Traffic**: To mimic a Cisco-style ACL rule that allows all IP traffic from a specified
|
||||
range, use the wildcard mask in conjunction with a permit action. If your system supports an `ALL` option
|
||||
for the IP protocol, this can be used to allow all types of IP traffic; otherwise, the absence of a
|
||||
specified protocol can be interpreted to mean all protocols.
|
||||
|
||||
Methods include functionalities to add and remove rules, reset to default configurations, and evaluate
|
||||
whether specific frames are permitted or denied based on the current set of rules. The class also provides
|
||||
utility functions to describe the current state and display the rules in a human-readable format.
|
||||
|
||||
Example:
|
||||
>>> # To add a rule that permits all TCP traffic from the subnet 192.168.1.0/24 to 192.168.2.0/24:
|
||||
>>> acl = AccessControlList()
|
||||
>>> acl.add_rule(
|
||||
... action=ACLAction.PERMIT,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... src_ip_address="192.168.1.0",
|
||||
... src_wildcard_mask="0.0.0.255",
|
||||
... dst_ip_address="192.168.2.0",
|
||||
... dst_wildcard_mask="0.0.0.255"
|
||||
...)
|
||||
|
||||
This example demonstrates adding a rule with specific source and destination IP ranges, using wildcard masks
|
||||
to allow a broad range of traffic while maintaining control over the flow of data for security and
|
||||
management purposes.
|
||||
|
||||
:ivar ACLAction implicit_action: The default action (permit or deny) applied when no other rule matches.
|
||||
Typically set to deny to follow the principle of least privilege.
|
||||
:ivar int max_acl_rules: The maximum number of ACL rules that can be added to the list. Defaults to 25.
|
||||
"""
|
||||
|
||||
sys_log: SysLog
|
||||
implicit_action: ACLAction
|
||||
implicit_rule: ACLRule
|
||||
max_acl_rules: int = 25
|
||||
name: str
|
||||
_acl: List[Optional[ACLRule]] = [None] * 24
|
||||
_default_config: Dict[int, dict] = {}
|
||||
"""Config dict describing how the ACL list should look at episode start"""
|
||||
@@ -150,6 +340,7 @@ class AccessControlList(SimComponent):
|
||||
)
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
# TODO: Add src and dst wildcard masks as positional args in this request.
|
||||
rm = super()._init_request_manager()
|
||||
|
||||
# When the request reaches this action, it should now contain solely positional args for the 'add_rule' action.
|
||||
@@ -165,13 +356,13 @@ class AccessControlList(SimComponent):
|
||||
"add_rule",
|
||||
RequestType(
|
||||
func=lambda request, context: self.add_rule(
|
||||
ACLAction[request[0]],
|
||||
None if request[1] == "ALL" else IPProtocol[request[1]],
|
||||
None if request[2] == "ALL" else IPv4Address(request[2]),
|
||||
None if request[3] == "ALL" else Port[request[3]],
|
||||
None if request[4] == "ALL" else IPv4Address(request[4]),
|
||||
None if request[5] == "ALL" else Port[request[5]],
|
||||
int(request[6]),
|
||||
action=ACLAction[request[0]],
|
||||
protocol=None if request[1] == "ALL" else IPProtocol[request[1]],
|
||||
src_ip_address=None if request[2] == "ALL" else IPv4Address(request[2]),
|
||||
src_port=None if request[3] == "ALL" else Port[request[3]],
|
||||
dst_ip_address=None if request[4] == "ALL" else IPv4Address(request[4]),
|
||||
dst_port=None if request[5] == "ALL" else Port[request[5]],
|
||||
position=int(request[6]),
|
||||
)
|
||||
),
|
||||
)
|
||||
@@ -210,39 +401,71 @@ class AccessControlList(SimComponent):
|
||||
"""
|
||||
return len([rule for rule in self._acl if rule is not None])
|
||||
|
||||
@validate_call()
|
||||
def add_rule(
|
||||
self,
|
||||
action: ACLAction,
|
||||
action: ACLAction = ACLAction.DENY,
|
||||
protocol: Optional[IPProtocol] = None,
|
||||
src_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
src_ip_address: Optional[IPV4Address] = None,
|
||||
src_wildcard_mask: Optional[IPV4Address] = None,
|
||||
dst_ip_address: Optional[IPV4Address] = None,
|
||||
dst_wildcard_mask: Optional[IPV4Address] = None,
|
||||
src_port: Optional[Port] = None,
|
||||
dst_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
dst_port: Optional[Port] = None,
|
||||
position: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Add a new ACL rule.
|
||||
Adds a new ACL rule to control network traffic based on specified criteria.
|
||||
|
||||
:param ACLAction action: Action to be performed (Permit/Deny).
|
||||
:param Optional[IPProtocol] protocol: Network protocol.
|
||||
:param Optional[Union[str, IPv4Address]] src_ip_address: Source IP address.
|
||||
:param Optional[Port] src_port: Source port number.
|
||||
:param Optional[Union[str, IPv4Address]] dst_ip_address: Destination IP address.
|
||||
:param Optional[Port] dst_port: Destination port number.
|
||||
:param int position: Position in the ACL list to insert the rule.
|
||||
:raises ValueError: When the position is out of bounds.
|
||||
This method allows defining rules that specify whether to permit or deny traffic with particular
|
||||
characteristics, including source and destination IP addresses, ports, and protocols. Wildcard masks can be
|
||||
used to specify a range of IP addresses, allowing for broader rule application. If specifying a dedicated IP
|
||||
address without needing a range, the wildcard mask can be omitted.
|
||||
|
||||
Example:
|
||||
>>> # To block all traffic except SSH from a specific IP range to a server:
|
||||
>>> router = Router("router")
|
||||
>>> router.add_rule(
|
||||
... action=ACLAction.DENY,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... src_ip_address="192.168.1.0",
|
||||
... src_wildcard_mask="0.0.0.255",
|
||||
... dst_ip_address="10.10.10.5",
|
||||
... dst_port=Port.SSH,
|
||||
... position=5
|
||||
... )
|
||||
>>> # This permits SSH traffic from the 192.168.1.0/24 subnet to the 10.10.10.5 server.
|
||||
>>>
|
||||
>>> # Then if we want to allow a specific IP address from this subnet to SSH into the server
|
||||
>>> router.add_rule(
|
||||
... action=ACLAction.PERMIT,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... src_ip_address="192.168.1.25",
|
||||
... dst_ip_address="10.10.10.5",
|
||||
... dst_port=Port.SSH,
|
||||
... position=4
|
||||
... )
|
||||
|
||||
:param action: The action to take (Permit/Deny) when the rule matches traffic.
|
||||
:param protocol: The network protocol (TCP/UDP/ICMP) to match. If None, matches any protocol.
|
||||
:param src_ip_address: The source IP address to match. If None, matches any source IP.
|
||||
:param src_wildcard_mask: Specifies a wildcard mask for the source IP. Use for IP ranges.
|
||||
:param dst_ip_address: The destination IP address to match. If None, matches any destination IP.
|
||||
:param dst_wildcard_mask: Specifies a wildcard mask for the destination IP. Use for IP ranges.
|
||||
:param src_port: The source port to match. If None, matches any source port.
|
||||
:param dst_port: The destination port to match. If None, matches any destination port.
|
||||
:param int position: The position in the ACL list to insert this rule. Defaults is position 0 right at the top.
|
||||
:raises ValueError: If the position is out of bounds.
|
||||
"""
|
||||
if isinstance(src_ip_address, str):
|
||||
src_ip_address = IPv4Address(src_ip_address)
|
||||
if isinstance(dst_ip_address, str):
|
||||
dst_ip_address = IPv4Address(dst_ip_address)
|
||||
if 0 <= position < self.max_acl_rules:
|
||||
if self._acl[position]:
|
||||
self.sys_log.info(f"Overwriting ACL rule at position {position}")
|
||||
self._acl[position] = ACLRule(
|
||||
action=action,
|
||||
src_ip_address=src_ip_address,
|
||||
src_wildcard_mask=src_wildcard_mask,
|
||||
dst_ip_address=dst_ip_address,
|
||||
dst_wildcard_mask=dst_wildcard_mask,
|
||||
protocol=protocol,
|
||||
src_port=src_port,
|
||||
dst_port=dst_port,
|
||||
@@ -264,43 +487,25 @@ class AccessControlList(SimComponent):
|
||||
else:
|
||||
raise ValueError(f"Cannot remove ACL rule, position {position} is out of bounds.")
|
||||
|
||||
def is_permitted(
|
||||
self,
|
||||
protocol: IPProtocol,
|
||||
src_ip_address: Union[str, IPv4Address],
|
||||
src_port: Optional[Port],
|
||||
dst_ip_address: Union[str, IPv4Address],
|
||||
dst_port: Optional[Port],
|
||||
) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
|
||||
"""
|
||||
Check if a packet with the given properties is permitted through the ACL.
|
||||
|
||||
:param protocol: The protocol of the packet.
|
||||
:param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address.
|
||||
:param src_port: Source port of the packet. Optional.
|
||||
:param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address.
|
||||
:param dst_port: Destination port of the packet. Optional.
|
||||
:return: A tuple with a boolean indicating if the packet is permitted and an optional rule or implicit action
|
||||
string.
|
||||
"""
|
||||
if not isinstance(src_ip_address, IPv4Address):
|
||||
src_ip_address = IPv4Address(src_ip_address)
|
||||
if not isinstance(dst_ip_address, IPv4Address):
|
||||
dst_ip_address = IPv4Address(dst_ip_address)
|
||||
for rule in self._acl:
|
||||
if not rule:
|
||||
def is_permitted(self, frame: Frame) -> Tuple[bool, ACLRule]:
|
||||
"""Check if a packet with the given properties is permitted through the ACL."""
|
||||
permitted = False
|
||||
rule: ACLRule = None
|
||||
for _rule in self._acl:
|
||||
if not _rule:
|
||||
continue
|
||||
|
||||
if (
|
||||
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
|
||||
and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
|
||||
and (rule.protocol == protocol or rule.protocol is None)
|
||||
and (rule.src_port == src_port or rule.src_port is None)
|
||||
and (rule.dst_port == dst_port or rule.dst_port is None)
|
||||
):
|
||||
return rule.action == ACLAction.PERMIT, rule
|
||||
permitted, rule_match = _rule.permit_frame_check(frame)
|
||||
if rule_match:
|
||||
rule = _rule
|
||||
break
|
||||
if not rule:
|
||||
permitted = self.implicit_action == ACLAction.PERMIT
|
||||
rule = self.implicit_rule
|
||||
|
||||
return self.implicit_action == ACLAction.PERMIT, f"Implicit {self.implicit_action.name}"
|
||||
rule.match_count += 1
|
||||
|
||||
return permitted, rule
|
||||
|
||||
def get_relevant_rules(
|
||||
self,
|
||||
@@ -346,11 +551,25 @@ class AccessControlList(SimComponent):
|
||||
|
||||
:param markdown: Whether to display the table in Markdown format. Defaults to False.
|
||||
"""
|
||||
table = PrettyTable(["Index", "Action", "Protocol", "Src IP", "Src Port", "Dst IP", "Dst Port"])
|
||||
table = PrettyTable(
|
||||
[
|
||||
"Index",
|
||||
"Action",
|
||||
"Protocol",
|
||||
"Src IP",
|
||||
"Src Wildcard",
|
||||
"Src Port",
|
||||
"Dst IP",
|
||||
"Dst Wildcard",
|
||||
"Dst Port",
|
||||
"Matched",
|
||||
]
|
||||
)
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.sys_log.hostname} Access Control List"
|
||||
|
||||
table.title = f"{self.name} Access Control List"
|
||||
for index, rule in enumerate(self.acl + [self.implicit_rule]):
|
||||
if rule:
|
||||
table.add_row(
|
||||
@@ -359,22 +578,16 @@ class AccessControlList(SimComponent):
|
||||
rule.action.name if rule.action else "ANY",
|
||||
rule.protocol.name if rule.protocol else "ANY",
|
||||
rule.src_ip_address if rule.src_ip_address else "ANY",
|
||||
rule.src_wildcard_mask if rule.src_wildcard_mask else "ANY",
|
||||
f"{rule.src_port.value} ({rule.src_port.name})" if rule.src_port else "ANY",
|
||||
rule.dst_ip_address if rule.dst_ip_address else "ANY",
|
||||
rule.dst_wildcard_mask if rule.dst_wildcard_mask else "ANY",
|
||||
f"{rule.dst_port.value} ({rule.dst_port.name})" if rule.dst_port else "ANY",
|
||||
rule.match_count,
|
||||
]
|
||||
)
|
||||
print(table)
|
||||
|
||||
@property
|
||||
def num_rules(self) -> int:
|
||||
"""
|
||||
Get the number of rules in the ACL.
|
||||
|
||||
:return: The number of rules in the ACL.
|
||||
"""
|
||||
return len([rule for rule in self._acl if rule is not None])
|
||||
|
||||
|
||||
class RouteEntry(SimComponent):
|
||||
"""
|
||||
@@ -880,7 +1093,7 @@ class Router(NetworkNode):
|
||||
if not kwargs.get("sys_log"):
|
||||
kwargs["sys_log"] = SysLog(hostname)
|
||||
if not kwargs.get("acl"):
|
||||
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY)
|
||||
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=hostname)
|
||||
if not kwargs.get("route_table"):
|
||||
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
|
||||
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
|
||||
@@ -1008,6 +1221,36 @@ class Router(NetworkNode):
|
||||
state["acl"] = self.acl.describe_state()
|
||||
return state
|
||||
|
||||
def check_send_frame_to_session_manager(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Determines whether a given network frame should be forwarded to the session manager.
|
||||
|
||||
This function evaluates whether the destination IP address of the frame corresponds to one of the router's
|
||||
interface IP addresses. If so, it then checks if the frame is an ICMP packet or if the destination port matches
|
||||
any of the ports that the router's software manager identifies as open. If either condition is met, the frame
|
||||
is considered for further processing by the session manager, implying potential application-level handling or
|
||||
response generation.
|
||||
|
||||
:param frame: The network frame to be evaluated.
|
||||
|
||||
:return: A boolean value indicating whether the frame should be sent to the session manager. ``True`` if the
|
||||
frame's destination IP matches the router's interface and is directed to an open port or is an ICMP packet,
|
||||
otherwise, ``False``.
|
||||
"""
|
||||
dst_ip_address = frame.ip.dst_ip_address
|
||||
dst_port = None
|
||||
if frame.ip.protocol == IPProtocol.TCP:
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.ip.protocol == IPProtocol.UDP:
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
if self.ip_is_router_interface(dst_ip_address) and (
|
||||
frame.icmp or dst_port in self.software_manager.get_open_ports()
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
Processes an incoming frame received on one of the router's interfaces.
|
||||
@@ -1021,26 +1264,8 @@ class Router(NetworkNode):
|
||||
if self.operating_state != NodeOperatingState.ON:
|
||||
return
|
||||
|
||||
protocol = frame.ip.protocol
|
||||
src_ip_address = frame.ip.src_ip_address
|
||||
dst_ip_address = frame.ip.dst_ip_address
|
||||
src_port = None
|
||||
dst_port = None
|
||||
if frame.ip.protocol == IPProtocol.TCP:
|
||||
src_port = frame.tcp.src_port
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.ip.protocol == IPProtocol.UDP:
|
||||
src_port = frame.udp.src_port
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
# Check if it's permitted
|
||||
permitted, rule = self.acl.is_permitted(
|
||||
protocol=protocol,
|
||||
src_ip_address=src_ip_address,
|
||||
src_port=src_port,
|
||||
dst_ip_address=dst_ip_address,
|
||||
dst_port=dst_port,
|
||||
)
|
||||
permitted, rule = self.acl.is_permitted(frame)
|
||||
|
||||
if not permitted:
|
||||
at_port = self._get_port_of_nic(from_network_interface)
|
||||
@@ -1054,13 +1279,7 @@ class Router(NetworkNode):
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
send_to_session_manager = False
|
||||
if (frame.icmp and self.ip_is_router_interface(dst_ip_address)) or (
|
||||
dst_port in self.software_manager.get_open_ports()
|
||||
):
|
||||
send_to_session_manager = True
|
||||
|
||||
if send_to_session_manager:
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
@@ -1196,7 +1415,7 @@ class Router(NetworkNode):
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
Prints the state of the Ethernet interfaces on the Router.
|
||||
Prints the state of the network interfaces on the Router.
|
||||
|
||||
:param markdown: Flag to indicate if the output should be in markdown format.
|
||||
"""
|
||||
@@ -1205,7 +1424,7 @@ class Router(NetworkNode):
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.hostname} Ethernet Interfaces"
|
||||
table.title = f"{self.hostname} Network Interfaces"
|
||||
for port, network_interface in self.network_interface.items():
|
||||
table.add_row(
|
||||
[
|
||||
|
||||
@@ -0,0 +1,211 @@
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.network.airspace import AirSpaceFrequency, IPWirelessNetworkInterface
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router, RouterInterface
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
class WirelessAccessPoint(IPWirelessNetworkInterface):
|
||||
"""
|
||||
Represents a Wireless Access Point (AP) in a network.
|
||||
|
||||
This class models a Wireless Access Point, a device that allows wireless devices to connect to a wired network
|
||||
using Wi-Fi or other wireless standards. The Wireless Access Point bridges the wireless and wired segments of
|
||||
the network, allowing wireless devices to communicate with other devices on the network.
|
||||
|
||||
As an integral component of wireless networking, a Wireless Access Point provides functionalities for network
|
||||
management, signal broadcasting, security enforcement, and connection handling. It also possesses Layer 3
|
||||
capabilities such as IP addressing and subnetting, allowing for network segmentation and routing.
|
||||
|
||||
Inherits from:
|
||||
- WirelessNetworkInterface: Provides basic properties and methods specific to wireless interfaces.
|
||||
- Layer3Interface: Provides Layer 3 properties like ip_address and subnet_mask, enabling the device to manage
|
||||
network traffic and routing.
|
||||
|
||||
This class can be further specialised or extended to support specific features or standards related to wireless
|
||||
networking, such as different Wi-Fi versions, frequency bands, or advanced security protocols.
|
||||
"""
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
"""
|
||||
Performs post-initialisation checks to ensure the model's IP configuration is valid.
|
||||
|
||||
This method is invoked after the initialisation of a network model object to validate its network settings,
|
||||
particularly to ensure that the assigned IP address is not a network address. This validation is crucial for
|
||||
maintaining the integrity of network simulations and avoiding configuration errors that could lead to
|
||||
unrealistic or incorrect behavior.
|
||||
|
||||
:param __context: Contextual information or parameters passed to the method, used for further initializing or
|
||||
validating the model post-creation.
|
||||
:raises ValueError: If the IP address is the same as the network address, indicating an incorrect configuration.
|
||||
"""
|
||||
if self.ip_network.network_address == self.ip_address:
|
||||
raise ValueError(f"{self.ip_address}/{self.subnet_mask} must not be a network address")
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
return super().describe_state()
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self._connected_node.sys_log.info("Frame discarded as TTL limit reached")
|
||||
return False
|
||||
frame.set_received_timestamp()
|
||||
self.pcap.capture_inbound(frame)
|
||||
# If this destination or is broadcast
|
||||
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
|
||||
self._connected_node.receive_frame(frame=frame, from_network_interface=self)
|
||||
return True
|
||||
return False
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
String representation of the NIC.
|
||||
|
||||
:return: A string combining the port number, MAC address and IP address of the NIC.
|
||||
"""
|
||||
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address} ({self.frequency})"
|
||||
|
||||
|
||||
class WirelessRouter(Router):
|
||||
"""
|
||||
A WirelessRouter class that extends the functionality of a standard Router to include wireless capabilities.
|
||||
|
||||
This class represents a network device that performs routing functions similar to a traditional router but also
|
||||
includes the functionality of a wireless access point. This allows the WirelessRouter to not only direct traffic
|
||||
between wired networks but also to manage and facilitate wireless network connections.
|
||||
|
||||
A WirelessRouter is instantiated and configured with both wired and wireless interfaces. The wired interfaces are
|
||||
managed similarly to those in a standard Router, while the wireless interfaces require additional configuration
|
||||
specific to wireless settings, such as setting the frequency band (e.g., 2.4 GHz or 5 GHz for Wi-Fi).
|
||||
|
||||
The WirelessRouter facilitates creating a network environment where devices can be interconnected via both
|
||||
Ethernet (wired) and Wi-Fi (wireless), making it an essential component for simulating more complex and realistic
|
||||
network topologies within PrimAITE.
|
||||
|
||||
Example:
|
||||
>>> wireless_router = WirelessRouter(hostname="wireless_router_1")
|
||||
>>> wireless_router.configure_router_interface(
|
||||
... ip_address="192.168.1.1",
|
||||
... subnet_mask="255.255.255.0"
|
||||
... )
|
||||
>>> wireless_router.configure_wireless_access_point(
|
||||
... ip_address="10.10.10.1",
|
||||
... subnet_mask="255.255.255.0"
|
||||
... frequency=AirSpaceFrequency.WIFI_2_4
|
||||
... )
|
||||
"""
|
||||
|
||||
network_interfaces: Dict[str, Union[RouterInterface, WirelessAccessPoint]] = {}
|
||||
network_interface: Dict[int, Union[RouterInterface, WirelessAccessPoint]] = {}
|
||||
|
||||
def __init__(self, hostname: str, **kwargs):
|
||||
super().__init__(hostname=hostname, num_ports=0, **kwargs)
|
||||
|
||||
self.connect_nic(WirelessAccessPoint(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0"))
|
||||
|
||||
self.connect_nic(RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0"))
|
||||
|
||||
self.set_original_state()
|
||||
|
||||
@property
|
||||
def wireless_access_point(self) -> WirelessAccessPoint:
|
||||
"""
|
||||
Retrieves the wireless access point interface associated with this wireless router.
|
||||
|
||||
This property provides direct access to the WirelessAccessPoint interface of the router, facilitating wireless
|
||||
communications. Specifically, it returns the interface configured on port 1, dedicated to establishing and
|
||||
managing wireless network connections. This interface is essential for enabling wireless connectivity,
|
||||
allowing devices within connect to the network wirelessly.
|
||||
|
||||
:return: The WirelessAccessPoint instance representing the wireless connection interface on port 1 of the
|
||||
wireless router.
|
||||
"""
|
||||
return self.network_interface[1]
|
||||
|
||||
@validate_call()
|
||||
def configure_wireless_access_point(
|
||||
self,
|
||||
ip_address: IPV4Address,
|
||||
subnet_mask: IPV4Address,
|
||||
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4,
|
||||
):
|
||||
"""
|
||||
Configures a wireless access point (WAP).
|
||||
|
||||
Sets its IP address, subnet mask, and operating frequency. This method ensures the wireless access point is
|
||||
properly set up to manage wireless communication over the specified frequency band.
|
||||
|
||||
The method first disables the WAP to safely apply configuration changes. After configuring the IP settings,
|
||||
it sets the WAP to operate on the specified frequency band and then re-enables the WAP for operation.
|
||||
|
||||
:param ip_address: The IP address to be assigned to the wireless access point.
|
||||
:param subnet_mask: The subnet mask associated with the IP address
|
||||
:param frequency: The operating frequency of the wireless access point, defined by the AirSpaceFrequency
|
||||
enum. This determines the frequency band (e.g., 2.4 GHz or 5 GHz) the access point will use for wireless
|
||||
communication. Default is AirSpaceFrequency.WIFI_2_4.
|
||||
"""
|
||||
self.wireless_access_point.disable() # Temporarily disable the WAP for reconfiguration
|
||||
network_interface = self.network_interface[1]
|
||||
network_interface.ip_address = ip_address
|
||||
network_interface.subnet_mask = subnet_mask
|
||||
self.sys_log.info(f"Configured WAP {network_interface}")
|
||||
self.set_original_state()
|
||||
self.wireless_access_point.frequency = frequency # Set operating frequency
|
||||
self.wireless_access_point.enable() # Re-enable the WAP with new settings
|
||||
|
||||
@property
|
||||
def router_interface(self) -> RouterInterface:
|
||||
"""
|
||||
Retrieves the router interface associated with this wireless router.
|
||||
|
||||
This property provides access to the router interface configured for wired connections. It specifically
|
||||
returns the interface configured on port 2, which is reserved for wired LAN/WAN connections.
|
||||
|
||||
:return: The RouterInterface instance representing the wired LAN/WAN connection on port 2 of the wireless
|
||||
router.
|
||||
"""
|
||||
return self.network_interface[2]
|
||||
|
||||
@validate_call()
|
||||
def configure_router_interface(
|
||||
self,
|
||||
ip_address: IPV4Address,
|
||||
subnet_mask: IPV4Address,
|
||||
):
|
||||
"""
|
||||
Configures a router interface.
|
||||
|
||||
Sets its IP address and subnet mask.
|
||||
|
||||
The method first disables the router interface to safely apply configuration changes. After configuring the IP
|
||||
settings, it re-enables the router interface for operation.
|
||||
|
||||
:param ip_address: The IP address to be assigned to the router interface.
|
||||
:param subnet_mask: The subnet mask associated with the IP address
|
||||
"""
|
||||
self.router_interface.disable() # Temporarily disable the router interface for reconfiguration
|
||||
super().configure_port(port=2, ip_address=ip_address, subnet_mask=subnet_mask) # Set IP configuration
|
||||
self.router_interface.enable() # Re-enable the router interface with new settings
|
||||
|
||||
def configure_port(self, port: int, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""Not Implemented."""
|
||||
raise NotImplementedError(
|
||||
"Please use the 'configure_wireless_access_point' and 'configure_router_interface' functions."
|
||||
)
|
||||
@@ -1,9 +1,9 @@
|
||||
from enum import Enum
|
||||
from ipaddress import IPv4Address
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
@@ -73,9 +73,9 @@ class IPPacket(BaseModel):
|
||||
... )
|
||||
"""
|
||||
|
||||
src_ip_address: IPv4Address
|
||||
src_ip_address: IPV4Address
|
||||
"Source IP address."
|
||||
dst_ip_address: IPv4Address
|
||||
dst_ip_address: IPV4Address
|
||||
"Destination IP address."
|
||||
protocol: IPProtocol = IPProtocol.TCP
|
||||
"IPProtocol."
|
||||
|
||||
Reference in New Issue
Block a user