Merged PR 272: #2205 - Firewall Node
## Description: This pull request introduces the Firewall class and extends the ACLRule functionality within PrimAITE to provide comprehensive network traffic management and security capabilities. These enhancements enable detailed control over data flow through network simulations, mimicking real-world firewall operations and ACL configurations. The updates focus on the addition of a Firewall node that extends the Router class functionalities and the enhancement of ACLRule to support IP ranges through wildcard masking, thus offering granular traffic filtering based on IP addresses, protocols, ports, and more. ## Key Features: **Firewall Class:** A new class that extends the Router class, incorporating firewall-specific logic for inspecting, directing, and filtering traffic between the internal, external, and DMZ (De-Militarized Zone) network interfaces. The Firewall class supports configuring network interfaces and applying Access Control Lists (ACLs) for inbound and outbound traffic control. **Enhanced ACLRule:** The ACLRule class has been updated to support IP ranges using wildcard masking. This allows for more flexible rule definitions, enabling users to specify broad network ranges or individual IP addresses in ACL rules. **Comprehensive ACL Configuration:** Six distinct ACLs (internal inbound, internal outbound, DMZ inbound, DMZ outbound, external inbound, and external outbound) provide meticulous control over traffic flow, ensuring robust network security. Examples included in the documentation illustrate how to configure ACLs for common scenarios, such as blocking external threats, permitting specific services, and restricting access to sensitive internal resources. **Intuitive Interface and ACL Management:** Simplified methods for configuring firewall interfaces and ACL rules enhance usability. The Firewall class offers intuitive functions for rule management, including adding, removing, and listing ACL rules. **Detailed Documentation and Examples:** Accompanying the code updates, comprehensive documentation and example configurations are provided, detailing the use and configuration of the Firewall node and ACL rules within PrimAITE simulations. ## Impact: The introduction of the Firewall class and the enhancement of ACLRule significantly broaden PrimAITE's capabilities for simulating realistic network security scenarios. Users can now accurately model the behavior of firewalls in their network simulations, applying complex ACLs to control traffic flow and enforce security policies. This update enables more detailed network security analyses, teaching, and experimentation within the PrimAITE environment. ## Test process Extensive unit tests have been added to cover the new functionality, ensuring reliability and correctness. Tests include scenarios for firewall configuration, ACL rule application, traffic filtering based on various criteria, and interaction between different network zones. ## Checklist - [X] PR is linked to ...
This commit is contained in:
@@ -12,8 +12,8 @@ class _SimOutput:
|
||||
self._path: Path = (
|
||||
_PRIMAITE_ROOT.parent.parent / "simulation_output" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
)
|
||||
self.save_pcap_logs: bool = False
|
||||
self.save_sys_logs: bool = False
|
||||
self.save_pcap_logs: bool = True
|
||||
self.save_sys_logs: bool = True
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
|
||||
307
src/primaite/simulator/network/airspace.py
Normal file
307
src/primaite/simulator/network/airspace.py
Normal file
@@ -0,0 +1,307 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Final, List, Optional
|
||||
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.network.hardware.base import Layer3Interface, NetworkInterface, WiredNetworkInterface
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.system.core.packet_capture import PacketCapture
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
__all__ = ["AIR_SPACE", "AirSpaceFrequency", "WirelessNetworkInterface", "IPWirelessNetworkInterface"]
|
||||
|
||||
|
||||
class AirSpace:
|
||||
"""Represents a wireless airspace, managing wireless network interfaces and handling wireless transmission."""
|
||||
|
||||
def __init__(self):
|
||||
self._wireless_interfaces: Dict[str, WirelessNetworkInterface] = {}
|
||||
self._wireless_interfaces_by_frequency: Dict[AirSpaceFrequency, List[WirelessNetworkInterface]] = {}
|
||||
|
||||
def show(self, frequency: Optional[AirSpaceFrequency] = None):
|
||||
"""
|
||||
Displays a summary of wireless interfaces in the airspace, optionally filtered by a specific frequency.
|
||||
|
||||
:param frequency: The frequency band to filter devices by. If None, devices for all frequencies are shown.
|
||||
"""
|
||||
table = PrettyTable()
|
||||
table.field_names = ["Connected Node", "MAC Address", "IP Address", "Subnet Mask", "Frequency", "Status"]
|
||||
|
||||
# If a specific frequency is provided, filter by it; otherwise, use all frequencies.
|
||||
frequencies_to_show = [frequency] if frequency else self._wireless_interfaces_by_frequency.keys()
|
||||
|
||||
for freq in frequencies_to_show:
|
||||
interfaces = self._wireless_interfaces_by_frequency.get(freq, [])
|
||||
for interface in interfaces:
|
||||
status = "Enabled" if interface.enabled else "Disabled"
|
||||
table.add_row(
|
||||
[
|
||||
interface._connected_node.hostname, # noqa
|
||||
interface.mac_address,
|
||||
interface.ip_address if hasattr(interface, "ip_address") else None,
|
||||
interface.subnet_mask if hasattr(interface, "subnet_mask") else None,
|
||||
str(freq),
|
||||
status,
|
||||
]
|
||||
)
|
||||
|
||||
print(table)
|
||||
|
||||
def add_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
|
||||
"""
|
||||
Adds a wireless network interface to the airspace if it's not already present.
|
||||
|
||||
:param wireless_interface: The wireless network interface to be added.
|
||||
"""
|
||||
if wireless_interface.mac_address not in self._wireless_interfaces:
|
||||
self._wireless_interfaces[wireless_interface.mac_address] = wireless_interface
|
||||
if wireless_interface.frequency not in self._wireless_interfaces_by_frequency:
|
||||
self._wireless_interfaces_by_frequency[wireless_interface.frequency] = []
|
||||
self._wireless_interfaces_by_frequency[wireless_interface.frequency].append(wireless_interface)
|
||||
|
||||
def remove_wireless_interface(self, wireless_interface: WirelessNetworkInterface):
|
||||
"""
|
||||
Removes a wireless network interface from the airspace if it's present.
|
||||
|
||||
:param wireless_interface: The wireless network interface to be removed.
|
||||
"""
|
||||
if wireless_interface.mac_address in self._wireless_interfaces:
|
||||
self._wireless_interfaces.pop(wireless_interface.mac_address)
|
||||
self._wireless_interfaces_by_frequency[wireless_interface.frequency].remove(wireless_interface)
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
Clears all wireless network interfaces and their frequency associations from the airspace.
|
||||
|
||||
After calling this method, the airspace will contain no wireless network interfaces, and transmissions cannot
|
||||
occur until new interfaces are added again.
|
||||
"""
|
||||
self._wireless_interfaces.clear()
|
||||
self._wireless_interfaces_by_frequency.clear()
|
||||
|
||||
def transmit(self, frame: Frame, sender_network_interface: WirelessNetworkInterface):
|
||||
"""
|
||||
Transmits a frame to all enabled wireless network interfaces on a specific frequency within the airspace.
|
||||
|
||||
This ensures that a wireless interface does not receive its own transmission.
|
||||
|
||||
:param frame: The frame to be transmitted.
|
||||
:param sender_network_interface: The wireless network interface sending the frame. This interface will be
|
||||
excluded from the list of receivers to prevent it from receiving its own transmission.
|
||||
"""
|
||||
for wireless_interface in self._wireless_interfaces_by_frequency.get(sender_network_interface.frequency, []):
|
||||
if wireless_interface != sender_network_interface and wireless_interface.enabled:
|
||||
wireless_interface.receive_frame(frame)
|
||||
|
||||
|
||||
AIR_SPACE: Final[AirSpace] = AirSpace()
|
||||
"""
|
||||
A singleton instance of the AirSpace class, representing the global wireless airspace.
|
||||
|
||||
This instance acts as the central management point for all wireless communications within the simulated network
|
||||
environment. By default, there is only one airspace in the simulation, making this variable a singleton that
|
||||
manages the registration, removal, and transmission of wireless frames across all wireless network interfaces configured
|
||||
in the simulation. It ensures that wireless frames are appropriately transmitted to and received by wireless
|
||||
interfaces based on their operational status and frequency band.
|
||||
"""
|
||||
|
||||
|
||||
class AirSpaceFrequency(Enum):
|
||||
"""Enumeration representing the operating frequencies for wireless communications."""
|
||||
|
||||
WIFI_2_4 = 2.4e9
|
||||
"""WiFi 2.4 GHz. Known for its extensive range and ability to penetrate solid objects effectively."""
|
||||
WIFI_5 = 5e9
|
||||
"""WiFi 5 GHz. Known for its higher data transmission speeds and reduced interference from other devices."""
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self == AirSpaceFrequency.WIFI_2_4:
|
||||
return "WiFi 2.4 GHz"
|
||||
elif self == AirSpaceFrequency.WIFI_5:
|
||||
return "WiFi 5 GHz"
|
||||
else:
|
||||
return "Unknown Frequency"
|
||||
|
||||
|
||||
class WirelessNetworkInterface(NetworkInterface, ABC):
|
||||
"""
|
||||
Represents a wireless network interface in a network device.
|
||||
|
||||
This abstract base class models wireless network interfaces, encapsulating properties and behaviors specific to
|
||||
wireless connectivity. It provides a framework for managing wireless connections, including signal strength,
|
||||
security protocols, and other wireless-specific attributes and methods.
|
||||
|
||||
Wireless network interfaces differ from wired ones in their medium of communication, relying on radio frequencies
|
||||
for data transmission and reception. This class serves as a base for more specific types of wireless network
|
||||
interfaces, such as Wi-Fi adapters or radio network interfaces, ensuring that essential wireless functionality is
|
||||
defined and standardised.
|
||||
|
||||
Inherits from:
|
||||
- NetworkInterface: Provides basic network interface properties and methods.
|
||||
|
||||
As an abstract base class, it requires subclasses to implement specific methods related to wireless communication
|
||||
and may define additional properties and methods specific to wireless technology.
|
||||
"""
|
||||
|
||||
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4
|
||||
|
||||
def enable(self):
|
||||
"""Attempt to enable the network interface."""
|
||||
if self.enabled:
|
||||
return
|
||||
|
||||
if not self._connected_node:
|
||||
_LOGGER.error(f"Interface {self} cannot be enabled as it is not connected to a Node")
|
||||
return
|
||||
|
||||
if self._connected_node.operating_state != NodeOperatingState.ON:
|
||||
self._connected_node.sys_log.info(
|
||||
f"Interface {self} cannot be enabled as the connected Node is not powered on"
|
||||
)
|
||||
return
|
||||
|
||||
self.enabled = True
|
||||
self._connected_node.sys_log.info(f"Network Interface {self} enabled")
|
||||
self.pcap = PacketCapture(hostname=self._connected_node.hostname, interface_num=self.port_num)
|
||||
AIR_SPACE.add_wireless_interface(self)
|
||||
|
||||
def disable(self):
|
||||
"""Disable the network interface."""
|
||||
if not self.enabled:
|
||||
return
|
||||
self.enabled = False
|
||||
if self._connected_node:
|
||||
self._connected_node.sys_log.info(f"Network Interface {self} disabled")
|
||||
else:
|
||||
_LOGGER.debug(f"Interface {self} disabled")
|
||||
AIR_SPACE.remove_wireless_interface(self)
|
||||
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame over the airspace.
|
||||
|
||||
This method sends a frame if the network interface is enabled and connected to a wireless airspace. It captures
|
||||
the frame using PCAP (if available) and transmits it through the airspace. Returns True if the frame is
|
||||
successfully sent, False otherwise (e.g., if the network interface is disabled).
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: True if the frame is sent successfully, False if the network interface is disabled.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.set_sent_timestamp()
|
||||
self.pcap.capture_outbound(frame)
|
||||
AIR_SPACE.transmit(frame, self)
|
||||
return True
|
||||
# Cannot send Frame as the network interface is not enabled
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the network interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class IPWirelessNetworkInterface(WirelessNetworkInterface, Layer3Interface, ABC):
|
||||
"""
|
||||
Represents an IP wireless network interface.
|
||||
|
||||
This interface operates at both the data link layer (Layer 2) and the network layer (Layer 3) of the OSI model,
|
||||
specifically tailored for IP-based communication over wireless connections. This abstract class provides a
|
||||
template for creating specific wireless network interfaces that support Internet Protocol (IP) functionalities.
|
||||
|
||||
As this class is a combination of its parent classes without additional attributes or methods, please refer to
|
||||
the documentation of `WirelessNetworkInterface` and `Layer3Interface` for more details on the supported operations
|
||||
and functionalities.
|
||||
|
||||
The class inherits from:
|
||||
- `WirelessNetworkInterface`: Providing the functionalities and characteristics of a wireless connection, such as
|
||||
managing wireless signal transmission, reception, and associated wireless protocols.
|
||||
- `Layer3Interface`: Enabling network layer capabilities, including IP address assignment, routing, and
|
||||
potentially, Layer 3 protocols like IPsec.
|
||||
|
||||
As an abstract class, `IPWirelessNetworkInterface` does not implement specific methods but ensures that any derived
|
||||
class provides implementations for the functionalities of both `WirelessNetworkInterface` and `Layer3Interface`.
|
||||
This setup is ideal for representing network interfaces in devices that require wireless connections and are capable
|
||||
of IP routing and addressing, such as wireless routers, access points, and wireless end-host devices like
|
||||
smartphones and laptops.
|
||||
|
||||
This class should be extended by concrete classes that define specific behaviors and properties of an IP-capable
|
||||
wireless network interface.
|
||||
"""
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
"""
|
||||
Performs post-initialisation checks to ensure the model's IP configuration is valid.
|
||||
|
||||
This method is invoked after the initialisation of a network model object to validate its network settings,
|
||||
particularly to ensure that the assigned IP address is not a network address. This validation is crucial for
|
||||
maintaining the integrity of network simulations and avoiding configuration errors that could lead to
|
||||
unrealistic or incorrect behavior.
|
||||
|
||||
:param __context: Contextual information or parameters passed to the method, used for further initializing or
|
||||
validating the model post-creation.
|
||||
:raises ValueError: If the IP address is the same as the network address, indicating an incorrect configuration.
|
||||
"""
|
||||
if self.ip_network.network_address == self.ip_address:
|
||||
raise ValueError(f"{self.ip_address}/{self.subnet_mask} must not be a network address")
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
# Get the state from the WiredNetworkInterface
|
||||
state = WiredNetworkInterface.describe_state(self)
|
||||
|
||||
# Update the state with information from Layer3Interface
|
||||
state.update(Layer3Interface.describe_state(self))
|
||||
|
||||
state["frequency"] = self.frequency
|
||||
|
||||
return state
|
||||
|
||||
def set_original_state(self):
|
||||
"""Sets the original state."""
|
||||
vals_to_include = {"ip_address", "subnet_mask", "mac_address", "speed", "mtu", "wake_on_lan", "enabled"}
|
||||
self._original_state = self.model_dump(include=vals_to_include)
|
||||
|
||||
def enable(self):
|
||||
"""
|
||||
Enables this wired network interface and attempts to send a "hello" message to the default gateway.
|
||||
|
||||
This method activates the network interface, making it operational for network communications. After enabling,
|
||||
it tries to initiate a default gateway "hello" process, typically to establish initial connectivity and resolve
|
||||
the default gateway's MAC address. This step is crucial for ensuring the interface can successfully send data
|
||||
to and receive data from the network.
|
||||
|
||||
The method safely handles cases where the connected node might not have a default gateway set or the
|
||||
`default_gateway_hello` method is not defined, ignoring such errors to proceed without interruption.
|
||||
"""
|
||||
super().enable()
|
||||
try:
|
||||
self._connected_node.default_gateway_hello()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
@@ -420,86 +420,6 @@ class IPWiredNetworkInterface(WiredNetworkInterface, Layer3Interface, ABC):
|
||||
pass
|
||||
|
||||
|
||||
class WirelessNetworkInterface(NetworkInterface, ABC):
|
||||
"""
|
||||
Represents a wireless network interface in a network device.
|
||||
|
||||
This abstract base class models wireless network interfaces, encapsulating properties and behaviors specific to
|
||||
wireless connectivity. It provides a framework for managing wireless connections, including signal strength,
|
||||
security protocols, and other wireless-specific attributes and methods.
|
||||
|
||||
Wireless network interfaces differ from wired ones in their medium of communication, relying on radio frequencies
|
||||
for data transmission and reception. This class serves as a base for more specific types of wireless interfaces,
|
||||
such as Wi-Fi adapters or radio network interfaces, ensuring that essential wireless functionality is defined
|
||||
and standardised.
|
||||
|
||||
Inherits from:
|
||||
- NetworkInterface: Provides basic network interface properties and methods.
|
||||
|
||||
As an abstract base class, it requires subclasses to implement specific methods related to wireless communication
|
||||
and may define additional properties and methods specific to wireless technology.
|
||||
"""
|
||||
|
||||
|
||||
class IPWirelessNetworkInterface(WiredNetworkInterface, Layer3Interface, ABC):
|
||||
"""
|
||||
Represents an IP wireless network interface.
|
||||
|
||||
This interface operates at both the data link layer (Layer 2) and the network layer (Layer 3) of the OSI model,
|
||||
specifically tailored for IP-based communication over wireless connections. This abstract class provides a
|
||||
template for creating specific wireless network interfaces that support Internet Protocol (IP) functionalities.
|
||||
|
||||
As this class is a combination of its parent classes without additional attributes or methods, please refer to
|
||||
the documentation of `WirelessNetworkInterface` and `Layer3Interface` for more details on the supported operations
|
||||
and functionalities.
|
||||
|
||||
The class inherits from:
|
||||
- `WirelessNetworkInterface`: Providing the functionalities and characteristics of a wireless connection, such as
|
||||
managing wireless signal transmission, reception, and associated wireless protocols.
|
||||
- `Layer3Interface`: Enabling network layer capabilities, including IP address assignment, routing, and
|
||||
potentially, Layer 3 protocols like IPsec.
|
||||
|
||||
As an abstract class, `IPWirelessNetworkInterface` does not implement specific methods but ensures that any derived
|
||||
class provides implementations for the functionalities of both `WirelessNetworkInterface` and `Layer3Interface`.
|
||||
This setup is ideal for representing network interfaces in devices that require wireless connections and are capable
|
||||
of IP routing and addressing, such as wireless routers, access points, and wireless end-host devices like
|
||||
smartphones and laptops.
|
||||
|
||||
This class should be extended by concrete classes that define specific behaviors and properties of an IP-capable
|
||||
wireless network interface.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def enable(self):
|
||||
"""Enable the interface."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def disable(self):
|
||||
"""Disable the interface."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame through the interface.
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: A boolean indicating whether the frame was successfully sent.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class Link(SimComponent):
|
||||
"""
|
||||
Represents a network link between NIC<-->NIC, NIC<-->SwitchPort, or SwitchPort<-->SwitchPort.
|
||||
|
||||
@@ -0,0 +1,493 @@
|
||||
from typing import Dict, Final, Optional, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.network.hardware.nodes.network.router import (
|
||||
AccessControlList,
|
||||
ACLAction,
|
||||
Router,
|
||||
RouterInterface,
|
||||
)
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
EXTERNAL_PORT_ID: Final[int] = 1
|
||||
"""The Firewall port ID of the external port."""
|
||||
INTERNAL_PORT_ID: Final[int] = 2
|
||||
"""The Firewall port ID of the internal port."""
|
||||
DMZ_PORT_ID: Final[int] = 3
|
||||
"""The Firewall port ID of the DMZ port."""
|
||||
|
||||
|
||||
class Firewall(Router):
|
||||
"""
|
||||
A Firewall class that extends the functionality of a Router.
|
||||
|
||||
The Firewall class acts as a network security system that monitors and controls incoming and outgoing
|
||||
network traffic based on predetermined security rules. It is an intermediary between internal and external
|
||||
networks (including DMZ - De-Militarized Zone), ensuring that all inbound and outbound traffic complies with
|
||||
the security policies.
|
||||
|
||||
The Firewall employs Access Control Lists (ACLs) to filter traffic. Both the internal and DMZ ports have both
|
||||
inbound and outbound ACLs that determine what traffic is allowed to pass.
|
||||
|
||||
In addition to the security functions, the Firewall can also perform some routing functions similar to a Router,
|
||||
forwarding packets between its interfaces based on the destination IP address.
|
||||
|
||||
Usage:
|
||||
To utilise the Firewall class, instantiate it with a hostname and optionally specify sys_log for logging.
|
||||
Configure the internal, external, and DMZ ports with IP addresses and subnet masks. Define ACL rules to
|
||||
permit or deny traffic based on your security policies. The Firewall will process frames based on these
|
||||
rules, determining whether to allow or block traffic at each network interface.
|
||||
|
||||
Example:
|
||||
>>> from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
>>> from primaite.simulator.network.transmission.transport_layer import Port
|
||||
>>> firewall = Firewall(hostname="Firewall1")
|
||||
>>> firewall.configure_internal_port(ip_address="192.168.1.1", subnet_mask="255.255.255.0")
|
||||
>>> firewall.configure_external_port(ip_address="10.0.0.1", subnet_mask="255.255.255.0")
|
||||
>>> firewall.configure_dmz_port(ip_address="172.16.0.1", subnet_mask="255.255.255.0")
|
||||
>>> # Permit HTTP traffic to the DMZ
|
||||
>>> firewall.dmz_inbound_acl.add_rule(
|
||||
... action=ACLAction.PERMIT,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... dst_port=Port.HTTP,
|
||||
... src_ip_address="0.0.0.0",
|
||||
... src_wildcard_mask="0.0.0.0",
|
||||
... dst_ip_address="172.16.0.0",
|
||||
... dst_wildcard_mask="0.0.0.255"
|
||||
... )
|
||||
|
||||
:ivar str hostname: The Firewall hostname.
|
||||
"""
|
||||
|
||||
internal_inbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing entering the internal network."""
|
||||
|
||||
internal_outbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic leaving the internal network."""
|
||||
|
||||
dmz_inbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic entering the DMZ."""
|
||||
|
||||
dmz_outbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic leaving the DMZ."""
|
||||
|
||||
external_inbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic entering from an external network."""
|
||||
|
||||
external_outbound_acl: Optional[AccessControlList] = None
|
||||
"""Access Control List for managing traffic leaving towards an external network."""
|
||||
|
||||
def __init__(self, hostname: str, **kwargs):
|
||||
if not kwargs.get("sys_log"):
|
||||
kwargs["sys_log"] = SysLog(hostname)
|
||||
|
||||
super().__init__(hostname=hostname, num_ports=3, **kwargs)
|
||||
|
||||
# Initialise ACLs for internal and dmz interfaces with a default DENY policy
|
||||
self.internal_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Inbound"
|
||||
)
|
||||
self.internal_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Outbound"
|
||||
)
|
||||
self.dmz_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Inbound"
|
||||
)
|
||||
self.dmz_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Outbound"
|
||||
)
|
||||
|
||||
# external ACLs should have a default PERMIT policy
|
||||
self.external_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Inbound"
|
||||
)
|
||||
self.external_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Outbound"
|
||||
)
|
||||
|
||||
self.set_original_state()
|
||||
|
||||
def set_original_state(self):
|
||||
"""Set the original state for the Firewall."""
|
||||
super().set_original_state()
|
||||
vals_to_include = {
|
||||
"internal_port",
|
||||
"external_port",
|
||||
"dmz_port",
|
||||
"internal_inbound_acl",
|
||||
"internal_outbound_acl",
|
||||
"dmz_inbound_acl",
|
||||
"dmz_outbound_acl",
|
||||
"external_inbound_acl",
|
||||
"external_outbound_acl",
|
||||
}
|
||||
self._original_state.update(self.model_dump(include=vals_to_include))
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Describes the current state of the Firewall.
|
||||
|
||||
:return: A dictionary representing the current state.
|
||||
"""
|
||||
state = super().describe_state()
|
||||
|
||||
state.update(
|
||||
{
|
||||
"internal_port": self.internal_port.describe_state(),
|
||||
"external_port": self.external_port.describe_state(),
|
||||
"dmz_port": self.dmz_port.describe_state(),
|
||||
"internal_inbound_acl": self.internal_inbound_acl.describe_state(),
|
||||
"internal_outbound_acl": self.internal_outbound_acl.describe_state(),
|
||||
"dmz_inbound_acl": self.dmz_inbound_acl.describe_state(),
|
||||
"dmz_outbound_acl": self.dmz_outbound_acl.describe_state(),
|
||||
"external_inbound_acl": self.external_inbound_acl.describe_state(),
|
||||
"external_outbound_acl": self.external_outbound_acl.describe_state(),
|
||||
}
|
||||
)
|
||||
|
||||
return state
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
Displays the current configuration of the firewall's network interfaces in a table format.
|
||||
|
||||
The table includes information about each port (External, Internal, DMZ), their MAC addresses, IP
|
||||
configurations, link speeds, and operational status. The output can be formatted as Markdown if specified.
|
||||
|
||||
:param markdown: If True, formats the output table in Markdown style. Useful for documentation or reporting
|
||||
purposes within Markdown-compatible platforms.
|
||||
"""
|
||||
table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.hostname} Network Interfaces"
|
||||
ports = {"External": self.external_port, "Internal": self.internal_port, "DMZ": self.dmz_port}
|
||||
for port, network_interface in ports.items():
|
||||
table.add_row(
|
||||
[
|
||||
port,
|
||||
network_interface.mac_address,
|
||||
f"{network_interface.ip_address}/{network_interface.ip_network.prefixlen}",
|
||||
network_interface.speed,
|
||||
"Enabled" if network_interface.enabled else "Disabled",
|
||||
]
|
||||
)
|
||||
print(table)
|
||||
|
||||
def show_rules(self, external: bool = True, internal: bool = True, dmz: bool = True, markdown: bool = False):
|
||||
"""
|
||||
Prints the configured ACL rules for each specified network zone of the firewall.
|
||||
|
||||
This method allows selective viewing of ACL rules applied to external, internal, and DMZ interfaces, providing
|
||||
a clear overview of the firewall's current traffic filtering policies. Each section can be independently
|
||||
toggled.
|
||||
|
||||
:param external: If True, shows ACL rules for external interfaces.
|
||||
:param internal: If True, shows ACL rules for internal interfaces.
|
||||
:param dmz: If True, shows ACL rules for DMZ interfaces.
|
||||
:param markdown: If True, formats the output in Markdown, enhancing readability in Markdown-compatible viewers.
|
||||
"""
|
||||
print(f"{self.hostname} Firewall Rules")
|
||||
print()
|
||||
if external:
|
||||
self.external_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.external_outbound_acl.show(markdown)
|
||||
print()
|
||||
if internal:
|
||||
self.internal_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.internal_outbound_acl.show(markdown)
|
||||
print()
|
||||
if dmz:
|
||||
self.dmz_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.dmz_outbound_acl.show(markdown)
|
||||
print()
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
Receive a frame and process it.
|
||||
|
||||
Acts as the primary entry point for all network frames arriving at the Firewall, determining the flow of
|
||||
traffic based on the source network interface controller (NIC) and applying the appropriate Access Control
|
||||
List (ACL) rules.
|
||||
|
||||
This method categorizes the incoming traffic into three main pathways based on the source NIC: external inbound,
|
||||
internal outbound, and DMZ (De-Militarized Zone) outbound. It plays a crucial role in enforcing the firewall's
|
||||
security policies by directing each frame to the corresponding processing method that evaluates it against
|
||||
specific ACL rules.
|
||||
|
||||
Based on the originating NIC:
|
||||
- Frames from the external port are processed as external inbound traffic, potentially destined for either the
|
||||
DMZ or the internal network.
|
||||
- Frames from the internal port are treated as internal outbound traffic, aimed at reaching the external
|
||||
network or a service within the DMZ.
|
||||
- Frames from the DMZ port are handled as DMZ outbound traffic, with potential destinations including the
|
||||
internal network or the external network.
|
||||
|
||||
:param frame: The network frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming. Used to
|
||||
determine the direction of the traffic (inbound or outbound) and the zone (external, internal,
|
||||
DMZ) it belongs to.
|
||||
"""
|
||||
# If the frame comes from the external port, it's considered as external inbound traffic
|
||||
if from_network_interface == self.external_port:
|
||||
self._process_external_inbound_frame(frame, from_network_interface)
|
||||
return
|
||||
# If the frame comes from the internal port, it's considered as internal outbound traffic
|
||||
elif from_network_interface == self.internal_port:
|
||||
self._process_internal_outbound_frame(frame, from_network_interface)
|
||||
return
|
||||
# If the frame comes from the DMZ port, it's considered as DMZ outbound traffic
|
||||
elif from_network_interface == self.dmz_port:
|
||||
self._process_dmz_outbound_frame(frame, from_network_interface)
|
||||
return
|
||||
|
||||
def _process_external_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames arriving from the external network.
|
||||
|
||||
Determines the path for frames based on their destination IP addresses and ACL rules for the external inbound
|
||||
interface. Frames destined for the DMZ or internal network are forwarded accordingly, if allowed by the ACL.
|
||||
|
||||
If a frame is permitted by the ACL, it is either passed to the session manager (if applicable) or forwarded to
|
||||
the appropriate network zone (DMZ/internal). Denied frames are logged and dropped.
|
||||
|
||||
:param frame: The frame to be processed, containing network layer and transport layer information.
|
||||
:param from_network_interface: The interface on the firewall through which the frame was received.
|
||||
"""
|
||||
# check if External Inbound ACL Rules permit frame
|
||||
permitted, rule = self.external_inbound_acl.is_permitted(frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# If the destination IP is within the DMZ network, process the frame as DMZ inbound
|
||||
if frame.ip.dst_ip_address in self.dmz_port.ip_network:
|
||||
self._process_dmz_inbound_frame(frame, from_network_interface)
|
||||
else:
|
||||
# Otherwise, process the frame as internal inbound
|
||||
self._process_internal_inbound_frame(frame, from_network_interface)
|
||||
|
||||
def _process_external_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are outbound towards the external network.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
# check if External Outbound ACL Rules permit frame
|
||||
permitted, rule = self.external_outbound_acl.is_permitted(frame=frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
|
||||
self.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
def _process_internal_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are inbound towards the internal LAN.
|
||||
|
||||
This method is responsible for handling frames coming from either the external network or the DMZ towards
|
||||
the internal LAN. It checks the frames against the internal inbound ACL to decide whether to allow or deny
|
||||
the traffic, and take appropriate actions.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
# check if Internal Inbound ACL Rules permit frame
|
||||
permitted, rule = self.internal_inbound_acl.is_permitted(frame=frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
|
||||
self.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
def _process_internal_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are outbound from the internal network.
|
||||
|
||||
This method handles frames that are leaving the internal network. Depending on the destination IP address,
|
||||
the frame may be forwarded to the DMZ or to the external network.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
permitted, rule = self.internal_outbound_acl.is_permitted(frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# If the destination IP is within the DMZ network, process the frame as DMZ inbound
|
||||
if frame.ip.dst_ip_address in self.dmz_port.ip_network:
|
||||
self._process_dmz_inbound_frame(frame, from_network_interface)
|
||||
else:
|
||||
# If the destination IP is not within the DMZ network, process the frame as external outbound
|
||||
self._process_external_outbound_frame(frame, from_network_interface)
|
||||
|
||||
def _process_dmz_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are inbound from the DMZ.
|
||||
|
||||
This method is responsible for handling frames coming from either the external network or the internal LAN
|
||||
towards the DMZ. It checks the frames against the DMZ inbound ACL to decide whether to allow or deny the
|
||||
traffic, and take appropriate actions.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
# check if DMZ Inbound ACL Rules permit frame
|
||||
permitted, rule = self.dmz_inbound_acl.is_permitted(frame=frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
|
||||
self.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
def _process_dmz_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process frames that are outbound from the DMZ.
|
||||
|
||||
This method handles frames originating from the DMZ and determines their appropriate path based on the
|
||||
destination IP address. It involves checking the DMZ outbound ACL, consulting the ARP cache and the routing
|
||||
table to find the correct outbound NIC, and then forwarding the frame to either the internal network or the
|
||||
external network.
|
||||
|
||||
:param frame: The frame to be processed.
|
||||
:param from_network_interface: The network interface controller from which the frame is coming.
|
||||
:param re_attempt: Indicates if the processing is a re-attempt, defaults to False.
|
||||
"""
|
||||
permitted, rule = self.dmz_outbound_acl.is_permitted(frame)
|
||||
if not permitted:
|
||||
self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}")
|
||||
return
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# Attempt to get the outbound NIC from the ARP cache using the destination IP address
|
||||
outbound_nic = self.software_manager.arp.get_arp_cache_network_interface(frame.ip.dst_ip_address)
|
||||
|
||||
# If outbound NIC is not found in the ARP cache, consult the routing table to find the best route
|
||||
if not outbound_nic:
|
||||
route = self.route_table.find_best_route(frame.ip.dst_ip_address)
|
||||
if route:
|
||||
# If a route is found, get the corresponding outbound NIC from the ARP cache using the next-hop IP
|
||||
# address
|
||||
outbound_nic = self.software_manager.arp.get_arp_cache_network_interface(route.next_hop_ip_address)
|
||||
|
||||
# If an outbound NIC is determined
|
||||
if outbound_nic:
|
||||
if outbound_nic == self.external_port:
|
||||
# If the outbound NIC is the external port, check the frame against the DMZ outbound ACL and
|
||||
# process it as an external outbound frame
|
||||
self._process_external_outbound_frame(frame, from_network_interface)
|
||||
return
|
||||
elif outbound_nic == self.internal_port:
|
||||
# If the outbound NIC is the internal port, check the frame against the DMZ outbound ACL and
|
||||
# process it as an internal inbound frame
|
||||
self._process_internal_inbound_frame(frame, from_network_interface)
|
||||
return
|
||||
# TODO: What to do here? Destination unreachable? Send ICMP back?
|
||||
return
|
||||
|
||||
@property
|
||||
def external_port(self) -> RouterInterface:
|
||||
"""
|
||||
The external port of the firewall.
|
||||
|
||||
:return: The external port connecting the firewall to the external network.
|
||||
"""
|
||||
return self.network_interface[EXTERNAL_PORT_ID]
|
||||
|
||||
@validate_call()
|
||||
def configure_external_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""
|
||||
Configure the external port with an IP address and a subnet mask.
|
||||
|
||||
Enables the port once configured.
|
||||
|
||||
:param ip_address: The IP address to assign to the external port.
|
||||
:param subnet_mask: The subnet mask to assign to the external port.
|
||||
"""
|
||||
# Configure the external port with the specified IP address and subnet mask
|
||||
self.configure_port(EXTERNAL_PORT_ID, ip_address, subnet_mask)
|
||||
self.external_port.enable()
|
||||
|
||||
@property
|
||||
def internal_port(self) -> RouterInterface:
|
||||
"""
|
||||
The internal port of the firewall.
|
||||
|
||||
:return: The external port connecting the firewall to the internal LAN.
|
||||
"""
|
||||
return self.network_interface[INTERNAL_PORT_ID]
|
||||
|
||||
@validate_call()
|
||||
def configure_internal_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""
|
||||
Configure the internal port with an IP address and a subnet mask.
|
||||
|
||||
Enables the port once configured.
|
||||
|
||||
:param ip_address: The IP address to assign to the internal port.
|
||||
:param subnet_mask: The subnet mask to assign to the internal port.
|
||||
"""
|
||||
self.configure_port(INTERNAL_PORT_ID, ip_address, subnet_mask)
|
||||
self.internal_port.enable()
|
||||
|
||||
@property
|
||||
def dmz_port(self) -> RouterInterface:
|
||||
"""
|
||||
The DMZ port of the firewall.
|
||||
|
||||
:return: The external port connecting the firewall to the DMZ.
|
||||
"""
|
||||
return self.network_interface[DMZ_PORT_ID]
|
||||
|
||||
@validate_call()
|
||||
def configure_dmz_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""
|
||||
Configure the DMZ port with an IP address and a subnet mask.
|
||||
|
||||
Enables the port once configured.
|
||||
|
||||
:param ip_address: The IP address to assign to the DMZ port.
|
||||
:param subnet_mask: The subnet mask to assign to the DMZ port.
|
||||
"""
|
||||
self.configure_port(DMZ_PORT_ID, ip_address, subnet_mask)
|
||||
self.dmz_port.enable()
|
||||
@@ -6,6 +6,7 @@ from ipaddress import IPv4Address, IPv4Network
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.core import RequestManager, RequestType, SimComponent
|
||||
from primaite.simulator.network.hardware.base import IPWiredNetworkInterface
|
||||
@@ -19,6 +20,43 @@ from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.services.arp.arp import ARP
|
||||
from primaite.simulator.system.services.icmp.icmp import ICMP
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
@validate_call()
|
||||
def ip_matches_masked_range(ip_to_check: IPV4Address, base_ip: IPV4Address, wildcard_mask: IPV4Address) -> bool:
|
||||
"""
|
||||
Determine if a given IP address matches a range defined by a base IP address and a wildcard mask.
|
||||
|
||||
The wildcard mask specifies which bits in the IP address should be ignored (1) and which bits must match (0).
|
||||
|
||||
The function applies the wildcard mask to both the base IP and the IP address to check by first negating the
|
||||
wildcard mask and then performing a bitwise AND operation. This process effectively masks out the bits indicated
|
||||
by the wildcard mask. If the resulting masked IP addresses are equal, it means the IP address to check falls within
|
||||
the range defined by the base IP and wildcard mask.
|
||||
|
||||
:param IPV4Address ip_to_check: The IP address to be checked.
|
||||
:param IPV4Address base_ip: The base IP address defining the start of the range.
|
||||
:param IPV4Address wildcard_mask: The wildcard mask specifying which bits to ignore.
|
||||
:return: A boolean value indicating whether the IP address matches the masked range.
|
||||
:rtype: bool
|
||||
|
||||
Example usage:
|
||||
>>> ip_matches_masked_range(ip_to_check="192.168.10.10", base_ip="192.168.1.1", wildcard_mask="0.0.255.255")
|
||||
False
|
||||
"""
|
||||
# Convert the IP addresses from IPv4Address objects to integer representations for bitwise operations
|
||||
base_ip_int = int(base_ip)
|
||||
ip_to_check_int = int(ip_to_check)
|
||||
wildcard_int = int(wildcard_mask)
|
||||
|
||||
# Negate the wildcard mask and apply it to both the base IP and the IP to check using bitwise AND
|
||||
# This step masks out the bits to be ignored according to the wildcard mask
|
||||
masked_base_ip = base_ip_int & ~wildcard_int
|
||||
masked_ip_to_check = ip_to_check_int & ~wildcard_int
|
||||
|
||||
# Compare the masked IP addresses to determine if they match within the masked range
|
||||
return masked_base_ip == masked_ip_to_check
|
||||
|
||||
|
||||
class ACLAction(Enum):
|
||||
@@ -30,22 +68,62 @@ class ACLAction(Enum):
|
||||
|
||||
class ACLRule(SimComponent):
|
||||
"""
|
||||
Represents an Access Control List (ACL) rule.
|
||||
Represents an Access Control List (ACL) rule within a network device.
|
||||
|
||||
:ivar ACLAction action: Action to be performed (Permit/Deny). Default is DENY.
|
||||
:ivar Optional[IPProtocol] protocol: Network protocol. Default is None.
|
||||
:ivar Optional[IPv4Address] src_ip_address: Source IP address. Default is None.
|
||||
:ivar Optional[Port] src_port: Source port number. Default is None.
|
||||
:ivar Optional[IPv4Address] dst_ip_address: Destination IP address. Default is None.
|
||||
:ivar Optional[Port] dst_port: Destination port number. Default is None.
|
||||
Enables fine-grained control over network traffic based on specified criteria such as IP addresses, protocols,
|
||||
and ports. ACL rules can be configured to permit or deny traffic, providing a powerful mechanism for enforcing
|
||||
security policies and traffic flow.
|
||||
|
||||
ACL rules support specifying exact match conditions, ranges of IP addresses using wildcard masks, and
|
||||
protocol types. This flexibility allows for complex traffic filtering scenarios, from blocking or allowing
|
||||
specific types of traffic to entire subnets.
|
||||
|
||||
**Usage:**
|
||||
|
||||
- **Dedicated IP Addresses**: To match traffic from or to a specific IP address, set the `src_ip_address`
|
||||
and/or `dst_ip_address` without a wildcard mask. This is useful for rules that apply to individual hosts.
|
||||
|
||||
- **IP Ranges with Wildcard Masks**: For rules that apply to a range of IP addresses, use the `src_wildcard_mask`
|
||||
and/or `dst_wildcard_mask` in conjunction with the base IP address. Wildcard masks are a way to specify which
|
||||
bits of the IP address should be matched exactly and which bits can vary. For example, a wildcard mask of
|
||||
`0.0.0.255` applied to a base address of `192.168.1.0` allows for any address from `192.168.1.0` to
|
||||
`192.168.1.255`.
|
||||
|
||||
- **Allowing All IP Traffic**: To mimic the Cisco ACL rule that permits all IP traffic from a specific range,
|
||||
you may use wildcard masks with the rule action set to `PERMIT`. If your implementation includes an `ALL`
|
||||
option in the `IPProtocol` enum, use it to allow all protocols; otherwise, consider the rule without a
|
||||
specified protocol to apply to all IP traffic.
|
||||
|
||||
|
||||
The combination of these attributes allows for the creation of granular rules to control traffic flow
|
||||
effectively, enhancing network security and management.
|
||||
|
||||
|
||||
:ivar ACLAction action: Specifies whether to `PERMIT` or `DENY` the traffic that matches the rule conditions.
|
||||
The default action is `DENY`.
|
||||
:ivar Optional[IPProtocol] protocol: The network protocol (e.g., TCP, UDP, ICMP) to match. If `None`, the rule
|
||||
applies to all protocols.
|
||||
:ivar Optional[IPV4Address] src_ip_address: The source IP address to match. If combined with `src_wildcard_mask`,
|
||||
it specifies the start of an IP range.
|
||||
:ivar Optional[IPV4Address] src_wildcard_mask: The wildcard mask for the source IP address, defining the range
|
||||
of addresses to match.
|
||||
:ivar Optional[IPV4Address] dst_ip_address: The destination IP address to match. If combined with
|
||||
`dst_wildcard_mask`, it specifies the start of an IP range.
|
||||
:ivar Optional[IPv4Address] dst_wildcard_mask: The wildcard mask for the destination IP address, defining the
|
||||
range of addresses to match.
|
||||
:ivar Optional[Port] src_port: The source port number to match. Relevant for TCP/UDP protocols.
|
||||
:ivar Optional[Port] dst_port: The destination port number to match. Relevant for TCP/UDP protocols.
|
||||
"""
|
||||
|
||||
action: ACLAction = ACLAction.DENY
|
||||
protocol: Optional[IPProtocol] = None
|
||||
src_ip_address: Optional[IPv4Address] = None
|
||||
src_ip_address: Optional[IPV4Address] = None
|
||||
src_wildcard_mask: Optional[IPV4Address] = None
|
||||
dst_ip_address: Optional[IPV4Address] = None
|
||||
dst_wildcard_mask: Optional[IPV4Address] = None
|
||||
src_port: Optional[Port] = None
|
||||
dst_ip_address: Optional[IPv4Address] = None
|
||||
dst_port: Optional[Port] = None
|
||||
match_count: int = 0
|
||||
|
||||
def __str__(self) -> str:
|
||||
rule_strings = []
|
||||
@@ -76,24 +154,136 @@ class ACLRule(SimComponent):
|
||||
state["src_port"] = self.src_port.name if self.src_port else None
|
||||
state["dst_ip_address"] = str(self.dst_ip_address) if self.dst_ip_address else None
|
||||
state["dst_port"] = self.dst_port.name if self.dst_port else None
|
||||
state["match_count"] = self.match_count
|
||||
return state
|
||||
|
||||
def permit_frame_check(self, frame: Frame) -> Tuple[bool, bool]:
|
||||
"""
|
||||
Evaluates whether a given network frame should be permitted or denied based on this ACL rule.
|
||||
|
||||
This method checks the frame against the ACL rule's criteria, including protocol, source and destination IP
|
||||
addresses (with support for wildcard masking), and source and destination ports. The method assumes that an
|
||||
unspecified (None) criterion implies a match for any value in that category. For IP addresses, wildcard masking
|
||||
can be used to specify ranges of addresses that match the rule.
|
||||
|
||||
The method follows these steps to determine if a frame is permitted:
|
||||
|
||||
1. Check if the frame's protocol matches the ACL rule's protocol.
|
||||
2. For source and destination IP addresses:
|
||||
1. If a wildcard mask is defined, check if the frame's IP address is within the range specified by the base
|
||||
IP address and the wildcard mask.
|
||||
2. If no wildcard mask is defined, directly compare the frame's IP address to the one specified in the rule.
|
||||
3. Check if the frame's source and destination ports match those specified in the rule.
|
||||
4. The frame is permitted if it matches all specified criteria and the rule's action is PERMIT. Conversely, it
|
||||
is not permitted if any criterion does not match or if the rule's action is DENY.
|
||||
|
||||
:param frame: The network frame to be evaluated.
|
||||
:return: A tuple containing two boolean values: The first indicates if the frame is permitted by this rule (
|
||||
True if permitted, otherwise False). The second indicates if the frame matches the rule's criteria (True
|
||||
if it matches, otherwise False).
|
||||
"""
|
||||
permitted = False
|
||||
frame_matches_rule = False
|
||||
protocol_matches = self.protocol == frame.ip.protocol if self.protocol else True
|
||||
|
||||
src_ip_matches = self.src_ip_address is None # Assume match if no specific src IP is defined
|
||||
if self.src_ip_address:
|
||||
if self.src_wildcard_mask:
|
||||
# If a src wildcard mask is provided, use it to check the range
|
||||
src_ip_matches = ip_matches_masked_range(
|
||||
ip_to_check=frame.ip.src_ip_address,
|
||||
base_ip=self.src_ip_address,
|
||||
wildcard_mask=self.src_wildcard_mask,
|
||||
)
|
||||
else:
|
||||
# Direct comparison if no wildcard mask is defined
|
||||
src_ip_matches = frame.ip.src_ip_address == self.src_ip_address
|
||||
|
||||
dst_ip_matches = self.dst_ip_address is None # Assume match if no specific dst IP is defined
|
||||
if self.dst_ip_address:
|
||||
if self.dst_wildcard_mask:
|
||||
# If a dst wildcard mask is provided, use it to check the range
|
||||
dst_ip_matches = ip_matches_masked_range(
|
||||
ip_to_check=frame.ip.dst_ip_address,
|
||||
base_ip=self.dst_ip_address,
|
||||
wildcard_mask=self.dst_wildcard_mask,
|
||||
)
|
||||
else:
|
||||
# Direct comparison if no wildcard mask is defined
|
||||
dst_ip_matches = frame.ip.dst_ip_address == self.dst_ip_address
|
||||
|
||||
src_port = None
|
||||
dst_port = None
|
||||
if frame.tcp:
|
||||
src_port = frame.tcp.src_port
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.udp:
|
||||
src_port = frame.udp.src_port
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
src_port_matches = self.src_port == src_port if self.src_port else True
|
||||
dst_port_matches = self.dst_port == dst_port if self.dst_port else True
|
||||
|
||||
# The frame is permitted if all conditions are met
|
||||
if protocol_matches and src_ip_matches and dst_ip_matches and src_port_matches and dst_port_matches:
|
||||
frame_matches_rule = True
|
||||
permitted = self.action == ACLAction.PERMIT
|
||||
|
||||
return permitted, frame_matches_rule
|
||||
|
||||
|
||||
class AccessControlList(SimComponent):
|
||||
"""
|
||||
Manages a list of ACLRules to filter network traffic.
|
||||
|
||||
:ivar SysLog sys_log: System logging instance.
|
||||
:ivar ACLAction implicit_action: Default action for rules.
|
||||
:ivar ACLRule implicit_rule: Implicit ACL rule, created based on implicit_action.
|
||||
:ivar int max_acl_rules: Maximum number of ACL rules that can be added. Default is 25.
|
||||
:ivar List[Optional[ACLRule]] _acl: A list containing the ACL rules.
|
||||
Manages a list of ACLRule instances to filter network traffic based on predefined criteria. This class
|
||||
provides functionalities to add, remove, and evaluate ACL rules, thereby controlling the flow of traffic
|
||||
through a network device.
|
||||
|
||||
ACL rules can specify conditions based on source and destination IP addresses, IP protocols (TCP, UDP, ICMP),
|
||||
and port numbers. Rules can be configured to permit or deny traffic that matches these conditions, offering
|
||||
granular control over network security policies.
|
||||
|
||||
Usage:
|
||||
- **Dedicated IP Addresses**: Directly specify the source and/or destination IP addresses in an ACL rule to
|
||||
match traffic to or from specific hosts.
|
||||
- **IP Ranges with Wildcard Masks**: Use wildcard masks along with base IP addresses to define ranges of IP
|
||||
addresses that an ACL rule applies to. This is useful for specifying subnets or ranges of IP addresses.
|
||||
- **Allowing All IP Traffic**: To mimic a Cisco-style ACL rule that allows all IP traffic from a specified
|
||||
range, use the wildcard mask in conjunction with a permit action. If your system supports an `ALL` option
|
||||
for the IP protocol, this can be used to allow all types of IP traffic; otherwise, the absence of a
|
||||
specified protocol can be interpreted to mean all protocols.
|
||||
|
||||
Methods include functionalities to add and remove rules, reset to default configurations, and evaluate
|
||||
whether specific frames are permitted or denied based on the current set of rules. The class also provides
|
||||
utility functions to describe the current state and display the rules in a human-readable format.
|
||||
|
||||
Example:
|
||||
>>> # To add a rule that permits all TCP traffic from the subnet 192.168.1.0/24 to 192.168.2.0/24:
|
||||
>>> acl = AccessControlList()
|
||||
>>> acl.add_rule(
|
||||
... action=ACLAction.PERMIT,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... src_ip_address="192.168.1.0",
|
||||
... src_wildcard_mask="0.0.0.255",
|
||||
... dst_ip_address="192.168.2.0",
|
||||
... dst_wildcard_mask="0.0.0.255"
|
||||
...)
|
||||
|
||||
This example demonstrates adding a rule with specific source and destination IP ranges, using wildcard masks
|
||||
to allow a broad range of traffic while maintaining control over the flow of data for security and
|
||||
management purposes.
|
||||
|
||||
:ivar ACLAction implicit_action: The default action (permit or deny) applied when no other rule matches.
|
||||
Typically set to deny to follow the principle of least privilege.
|
||||
:ivar int max_acl_rules: The maximum number of ACL rules that can be added to the list. Defaults to 25.
|
||||
"""
|
||||
|
||||
sys_log: SysLog
|
||||
implicit_action: ACLAction
|
||||
implicit_rule: ACLRule
|
||||
max_acl_rules: int = 25
|
||||
name: str
|
||||
_acl: List[Optional[ACLRule]] = [None] * 24
|
||||
_default_config: Dict[int, dict] = {}
|
||||
"""Config dict describing how the ACL list should look at episode start"""
|
||||
@@ -150,6 +340,7 @@ class AccessControlList(SimComponent):
|
||||
)
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
# TODO: Add src and dst wildcard masks as positional args in this request.
|
||||
rm = super()._init_request_manager()
|
||||
|
||||
# When the request reaches this action, it should now contain solely positional args for the 'add_rule' action.
|
||||
@@ -165,13 +356,13 @@ class AccessControlList(SimComponent):
|
||||
"add_rule",
|
||||
RequestType(
|
||||
func=lambda request, context: self.add_rule(
|
||||
ACLAction[request[0]],
|
||||
None if request[1] == "ALL" else IPProtocol[request[1]],
|
||||
None if request[2] == "ALL" else IPv4Address(request[2]),
|
||||
None if request[3] == "ALL" else Port[request[3]],
|
||||
None if request[4] == "ALL" else IPv4Address(request[4]),
|
||||
None if request[5] == "ALL" else Port[request[5]],
|
||||
int(request[6]),
|
||||
action=ACLAction[request[0]],
|
||||
protocol=None if request[1] == "ALL" else IPProtocol[request[1]],
|
||||
src_ip_address=None if request[2] == "ALL" else IPv4Address(request[2]),
|
||||
src_port=None if request[3] == "ALL" else Port[request[3]],
|
||||
dst_ip_address=None if request[4] == "ALL" else IPv4Address(request[4]),
|
||||
dst_port=None if request[5] == "ALL" else Port[request[5]],
|
||||
position=int(request[6]),
|
||||
)
|
||||
),
|
||||
)
|
||||
@@ -210,39 +401,71 @@ class AccessControlList(SimComponent):
|
||||
"""
|
||||
return len([rule for rule in self._acl if rule is not None])
|
||||
|
||||
@validate_call()
|
||||
def add_rule(
|
||||
self,
|
||||
action: ACLAction,
|
||||
action: ACLAction = ACLAction.DENY,
|
||||
protocol: Optional[IPProtocol] = None,
|
||||
src_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
src_ip_address: Optional[IPV4Address] = None,
|
||||
src_wildcard_mask: Optional[IPV4Address] = None,
|
||||
dst_ip_address: Optional[IPV4Address] = None,
|
||||
dst_wildcard_mask: Optional[IPV4Address] = None,
|
||||
src_port: Optional[Port] = None,
|
||||
dst_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
dst_port: Optional[Port] = None,
|
||||
position: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Add a new ACL rule.
|
||||
Adds a new ACL rule to control network traffic based on specified criteria.
|
||||
|
||||
:param ACLAction action: Action to be performed (Permit/Deny).
|
||||
:param Optional[IPProtocol] protocol: Network protocol.
|
||||
:param Optional[Union[str, IPv4Address]] src_ip_address: Source IP address.
|
||||
:param Optional[Port] src_port: Source port number.
|
||||
:param Optional[Union[str, IPv4Address]] dst_ip_address: Destination IP address.
|
||||
:param Optional[Port] dst_port: Destination port number.
|
||||
:param int position: Position in the ACL list to insert the rule.
|
||||
:raises ValueError: When the position is out of bounds.
|
||||
This method allows defining rules that specify whether to permit or deny traffic with particular
|
||||
characteristics, including source and destination IP addresses, ports, and protocols. Wildcard masks can be
|
||||
used to specify a range of IP addresses, allowing for broader rule application. If specifying a dedicated IP
|
||||
address without needing a range, the wildcard mask can be omitted.
|
||||
|
||||
Example:
|
||||
>>> # To block all traffic except SSH from a specific IP range to a server:
|
||||
>>> router = Router("router")
|
||||
>>> router.add_rule(
|
||||
... action=ACLAction.DENY,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... src_ip_address="192.168.1.0",
|
||||
... src_wildcard_mask="0.0.0.255",
|
||||
... dst_ip_address="10.10.10.5",
|
||||
... dst_port=Port.SSH,
|
||||
... position=5
|
||||
... )
|
||||
>>> # This permits SSH traffic from the 192.168.1.0/24 subnet to the 10.10.10.5 server.
|
||||
>>>
|
||||
>>> # Then if we want to allow a specific IP address from this subnet to SSH into the server
|
||||
>>> router.add_rule(
|
||||
... action=ACLAction.PERMIT,
|
||||
... protocol=IPProtocol.TCP,
|
||||
... src_ip_address="192.168.1.25",
|
||||
... dst_ip_address="10.10.10.5",
|
||||
... dst_port=Port.SSH,
|
||||
... position=4
|
||||
... )
|
||||
|
||||
:param action: The action to take (Permit/Deny) when the rule matches traffic.
|
||||
:param protocol: The network protocol (TCP/UDP/ICMP) to match. If None, matches any protocol.
|
||||
:param src_ip_address: The source IP address to match. If None, matches any source IP.
|
||||
:param src_wildcard_mask: Specifies a wildcard mask for the source IP. Use for IP ranges.
|
||||
:param dst_ip_address: The destination IP address to match. If None, matches any destination IP.
|
||||
:param dst_wildcard_mask: Specifies a wildcard mask for the destination IP. Use for IP ranges.
|
||||
:param src_port: The source port to match. If None, matches any source port.
|
||||
:param dst_port: The destination port to match. If None, matches any destination port.
|
||||
:param int position: The position in the ACL list to insert this rule. Defaults is position 0 right at the top.
|
||||
:raises ValueError: If the position is out of bounds.
|
||||
"""
|
||||
if isinstance(src_ip_address, str):
|
||||
src_ip_address = IPv4Address(src_ip_address)
|
||||
if isinstance(dst_ip_address, str):
|
||||
dst_ip_address = IPv4Address(dst_ip_address)
|
||||
if 0 <= position < self.max_acl_rules:
|
||||
if self._acl[position]:
|
||||
self.sys_log.info(f"Overwriting ACL rule at position {position}")
|
||||
self._acl[position] = ACLRule(
|
||||
action=action,
|
||||
src_ip_address=src_ip_address,
|
||||
src_wildcard_mask=src_wildcard_mask,
|
||||
dst_ip_address=dst_ip_address,
|
||||
dst_wildcard_mask=dst_wildcard_mask,
|
||||
protocol=protocol,
|
||||
src_port=src_port,
|
||||
dst_port=dst_port,
|
||||
@@ -264,43 +487,25 @@ class AccessControlList(SimComponent):
|
||||
else:
|
||||
raise ValueError(f"Cannot remove ACL rule, position {position} is out of bounds.")
|
||||
|
||||
def is_permitted(
|
||||
self,
|
||||
protocol: IPProtocol,
|
||||
src_ip_address: Union[str, IPv4Address],
|
||||
src_port: Optional[Port],
|
||||
dst_ip_address: Union[str, IPv4Address],
|
||||
dst_port: Optional[Port],
|
||||
) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
|
||||
"""
|
||||
Check if a packet with the given properties is permitted through the ACL.
|
||||
|
||||
:param protocol: The protocol of the packet.
|
||||
:param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address.
|
||||
:param src_port: Source port of the packet. Optional.
|
||||
:param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address.
|
||||
:param dst_port: Destination port of the packet. Optional.
|
||||
:return: A tuple with a boolean indicating if the packet is permitted and an optional rule or implicit action
|
||||
string.
|
||||
"""
|
||||
if not isinstance(src_ip_address, IPv4Address):
|
||||
src_ip_address = IPv4Address(src_ip_address)
|
||||
if not isinstance(dst_ip_address, IPv4Address):
|
||||
dst_ip_address = IPv4Address(dst_ip_address)
|
||||
for rule in self._acl:
|
||||
if not rule:
|
||||
def is_permitted(self, frame: Frame) -> Tuple[bool, ACLRule]:
|
||||
"""Check if a packet with the given properties is permitted through the ACL."""
|
||||
permitted = False
|
||||
rule: ACLRule = None
|
||||
for _rule in self._acl:
|
||||
if not _rule:
|
||||
continue
|
||||
|
||||
if (
|
||||
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
|
||||
and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
|
||||
and (rule.protocol == protocol or rule.protocol is None)
|
||||
and (rule.src_port == src_port or rule.src_port is None)
|
||||
and (rule.dst_port == dst_port or rule.dst_port is None)
|
||||
):
|
||||
return rule.action == ACLAction.PERMIT, rule
|
||||
permitted, rule_match = _rule.permit_frame_check(frame)
|
||||
if rule_match:
|
||||
rule = _rule
|
||||
break
|
||||
if not rule:
|
||||
permitted = self.implicit_action == ACLAction.PERMIT
|
||||
rule = self.implicit_rule
|
||||
|
||||
return self.implicit_action == ACLAction.PERMIT, f"Implicit {self.implicit_action.name}"
|
||||
rule.match_count += 1
|
||||
|
||||
return permitted, rule
|
||||
|
||||
def get_relevant_rules(
|
||||
self,
|
||||
@@ -346,11 +551,25 @@ class AccessControlList(SimComponent):
|
||||
|
||||
:param markdown: Whether to display the table in Markdown format. Defaults to False.
|
||||
"""
|
||||
table = PrettyTable(["Index", "Action", "Protocol", "Src IP", "Src Port", "Dst IP", "Dst Port"])
|
||||
table = PrettyTable(
|
||||
[
|
||||
"Index",
|
||||
"Action",
|
||||
"Protocol",
|
||||
"Src IP",
|
||||
"Src Wildcard",
|
||||
"Src Port",
|
||||
"Dst IP",
|
||||
"Dst Wildcard",
|
||||
"Dst Port",
|
||||
"Matched",
|
||||
]
|
||||
)
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.sys_log.hostname} Access Control List"
|
||||
|
||||
table.title = f"{self.name} Access Control List"
|
||||
for index, rule in enumerate(self.acl + [self.implicit_rule]):
|
||||
if rule:
|
||||
table.add_row(
|
||||
@@ -359,22 +578,16 @@ class AccessControlList(SimComponent):
|
||||
rule.action.name if rule.action else "ANY",
|
||||
rule.protocol.name if rule.protocol else "ANY",
|
||||
rule.src_ip_address if rule.src_ip_address else "ANY",
|
||||
rule.src_wildcard_mask if rule.src_wildcard_mask else "ANY",
|
||||
f"{rule.src_port.value} ({rule.src_port.name})" if rule.src_port else "ANY",
|
||||
rule.dst_ip_address if rule.dst_ip_address else "ANY",
|
||||
rule.dst_wildcard_mask if rule.dst_wildcard_mask else "ANY",
|
||||
f"{rule.dst_port.value} ({rule.dst_port.name})" if rule.dst_port else "ANY",
|
||||
rule.match_count,
|
||||
]
|
||||
)
|
||||
print(table)
|
||||
|
||||
@property
|
||||
def num_rules(self) -> int:
|
||||
"""
|
||||
Get the number of rules in the ACL.
|
||||
|
||||
:return: The number of rules in the ACL.
|
||||
"""
|
||||
return len([rule for rule in self._acl if rule is not None])
|
||||
|
||||
|
||||
class RouteEntry(SimComponent):
|
||||
"""
|
||||
@@ -880,7 +1093,7 @@ class Router(NetworkNode):
|
||||
if not kwargs.get("sys_log"):
|
||||
kwargs["sys_log"] = SysLog(hostname)
|
||||
if not kwargs.get("acl"):
|
||||
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY)
|
||||
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=hostname)
|
||||
if not kwargs.get("route_table"):
|
||||
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
|
||||
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
|
||||
@@ -1008,6 +1221,36 @@ class Router(NetworkNode):
|
||||
state["acl"] = self.acl.describe_state()
|
||||
return state
|
||||
|
||||
def check_send_frame_to_session_manager(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Determines whether a given network frame should be forwarded to the session manager.
|
||||
|
||||
This function evaluates whether the destination IP address of the frame corresponds to one of the router's
|
||||
interface IP addresses. If so, it then checks if the frame is an ICMP packet or if the destination port matches
|
||||
any of the ports that the router's software manager identifies as open. If either condition is met, the frame
|
||||
is considered for further processing by the session manager, implying potential application-level handling or
|
||||
response generation.
|
||||
|
||||
:param frame: The network frame to be evaluated.
|
||||
|
||||
:return: A boolean value indicating whether the frame should be sent to the session manager. ``True`` if the
|
||||
frame's destination IP matches the router's interface and is directed to an open port or is an ICMP packet,
|
||||
otherwise, ``False``.
|
||||
"""
|
||||
dst_ip_address = frame.ip.dst_ip_address
|
||||
dst_port = None
|
||||
if frame.ip.protocol == IPProtocol.TCP:
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.ip.protocol == IPProtocol.UDP:
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
if self.ip_is_router_interface(dst_ip_address) and (
|
||||
frame.icmp or dst_port in self.software_manager.get_open_ports()
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
Processes an incoming frame received on one of the router's interfaces.
|
||||
@@ -1021,26 +1264,8 @@ class Router(NetworkNode):
|
||||
if self.operating_state != NodeOperatingState.ON:
|
||||
return
|
||||
|
||||
protocol = frame.ip.protocol
|
||||
src_ip_address = frame.ip.src_ip_address
|
||||
dst_ip_address = frame.ip.dst_ip_address
|
||||
src_port = None
|
||||
dst_port = None
|
||||
if frame.ip.protocol == IPProtocol.TCP:
|
||||
src_port = frame.tcp.src_port
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.ip.protocol == IPProtocol.UDP:
|
||||
src_port = frame.udp.src_port
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
# Check if it's permitted
|
||||
permitted, rule = self.acl.is_permitted(
|
||||
protocol=protocol,
|
||||
src_ip_address=src_ip_address,
|
||||
src_port=src_port,
|
||||
dst_ip_address=dst_ip_address,
|
||||
dst_port=dst_port,
|
||||
)
|
||||
permitted, rule = self.acl.is_permitted(frame)
|
||||
|
||||
if not permitted:
|
||||
at_port = self._get_port_of_nic(from_network_interface)
|
||||
@@ -1054,13 +1279,7 @@ class Router(NetworkNode):
|
||||
network_interface=from_network_interface,
|
||||
)
|
||||
|
||||
send_to_session_manager = False
|
||||
if (frame.icmp and self.ip_is_router_interface(dst_ip_address)) or (
|
||||
dst_port in self.software_manager.get_open_ports()
|
||||
):
|
||||
send_to_session_manager = True
|
||||
|
||||
if send_to_session_manager:
|
||||
if self.check_send_frame_to_session_manager(frame):
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
@@ -1196,7 +1415,7 @@ class Router(NetworkNode):
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
Prints the state of the Ethernet interfaces on the Router.
|
||||
Prints the state of the network interfaces on the Router.
|
||||
|
||||
:param markdown: Flag to indicate if the output should be in markdown format.
|
||||
"""
|
||||
@@ -1205,7 +1424,7 @@ class Router(NetworkNode):
|
||||
if markdown:
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.hostname} Ethernet Interfaces"
|
||||
table.title = f"{self.hostname} Network Interfaces"
|
||||
for port, network_interface in self.network_interface.items():
|
||||
table.add_row(
|
||||
[
|
||||
|
||||
@@ -0,0 +1,211 @@
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.network.airspace import AirSpaceFrequency, IPWirelessNetworkInterface
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router, RouterInterface
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
class WirelessAccessPoint(IPWirelessNetworkInterface):
|
||||
"""
|
||||
Represents a Wireless Access Point (AP) in a network.
|
||||
|
||||
This class models a Wireless Access Point, a device that allows wireless devices to connect to a wired network
|
||||
using Wi-Fi or other wireless standards. The Wireless Access Point bridges the wireless and wired segments of
|
||||
the network, allowing wireless devices to communicate with other devices on the network.
|
||||
|
||||
As an integral component of wireless networking, a Wireless Access Point provides functionalities for network
|
||||
management, signal broadcasting, security enforcement, and connection handling. It also possesses Layer 3
|
||||
capabilities such as IP addressing and subnetting, allowing for network segmentation and routing.
|
||||
|
||||
Inherits from:
|
||||
- WirelessNetworkInterface: Provides basic properties and methods specific to wireless interfaces.
|
||||
- Layer3Interface: Provides Layer 3 properties like ip_address and subnet_mask, enabling the device to manage
|
||||
network traffic and routing.
|
||||
|
||||
This class can be further specialised or extended to support specific features or standards related to wireless
|
||||
networking, such as different Wi-Fi versions, frequency bands, or advanced security protocols.
|
||||
"""
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
"""
|
||||
Performs post-initialisation checks to ensure the model's IP configuration is valid.
|
||||
|
||||
This method is invoked after the initialisation of a network model object to validate its network settings,
|
||||
particularly to ensure that the assigned IP address is not a network address. This validation is crucial for
|
||||
maintaining the integrity of network simulations and avoiding configuration errors that could lead to
|
||||
unrealistic or incorrect behavior.
|
||||
|
||||
:param __context: Contextual information or parameters passed to the method, used for further initializing or
|
||||
validating the model post-creation.
|
||||
:raises ValueError: If the IP address is the same as the network address, indicating an incorrect configuration.
|
||||
"""
|
||||
if self.ip_network.network_address == self.ip_address:
|
||||
raise ValueError(f"{self.ip_address}/{self.subnet_mask} must not be a network address")
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
return super().describe_state()
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self._connected_node.sys_log.info("Frame discarded as TTL limit reached")
|
||||
return False
|
||||
frame.set_received_timestamp()
|
||||
self.pcap.capture_inbound(frame)
|
||||
# If this destination or is broadcast
|
||||
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
|
||||
self._connected_node.receive_frame(frame=frame, from_network_interface=self)
|
||||
return True
|
||||
return False
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
String representation of the NIC.
|
||||
|
||||
:return: A string combining the port number, MAC address and IP address of the NIC.
|
||||
"""
|
||||
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address} ({self.frequency})"
|
||||
|
||||
|
||||
class WirelessRouter(Router):
|
||||
"""
|
||||
A WirelessRouter class that extends the functionality of a standard Router to include wireless capabilities.
|
||||
|
||||
This class represents a network device that performs routing functions similar to a traditional router but also
|
||||
includes the functionality of a wireless access point. This allows the WirelessRouter to not only direct traffic
|
||||
between wired networks but also to manage and facilitate wireless network connections.
|
||||
|
||||
A WirelessRouter is instantiated and configured with both wired and wireless interfaces. The wired interfaces are
|
||||
managed similarly to those in a standard Router, while the wireless interfaces require additional configuration
|
||||
specific to wireless settings, such as setting the frequency band (e.g., 2.4 GHz or 5 GHz for Wi-Fi).
|
||||
|
||||
The WirelessRouter facilitates creating a network environment where devices can be interconnected via both
|
||||
Ethernet (wired) and Wi-Fi (wireless), making it an essential component for simulating more complex and realistic
|
||||
network topologies within PrimAITE.
|
||||
|
||||
Example:
|
||||
>>> wireless_router = WirelessRouter(hostname="wireless_router_1")
|
||||
>>> wireless_router.configure_router_interface(
|
||||
... ip_address="192.168.1.1",
|
||||
... subnet_mask="255.255.255.0"
|
||||
... )
|
||||
>>> wireless_router.configure_wireless_access_point(
|
||||
... ip_address="10.10.10.1",
|
||||
... subnet_mask="255.255.255.0"
|
||||
... frequency=AirSpaceFrequency.WIFI_2_4
|
||||
... )
|
||||
"""
|
||||
|
||||
network_interfaces: Dict[str, Union[RouterInterface, WirelessAccessPoint]] = {}
|
||||
network_interface: Dict[int, Union[RouterInterface, WirelessAccessPoint]] = {}
|
||||
|
||||
def __init__(self, hostname: str, **kwargs):
|
||||
super().__init__(hostname=hostname, num_ports=0, **kwargs)
|
||||
|
||||
self.connect_nic(WirelessAccessPoint(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0"))
|
||||
|
||||
self.connect_nic(RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0"))
|
||||
|
||||
self.set_original_state()
|
||||
|
||||
@property
|
||||
def wireless_access_point(self) -> WirelessAccessPoint:
|
||||
"""
|
||||
Retrieves the wireless access point interface associated with this wireless router.
|
||||
|
||||
This property provides direct access to the WirelessAccessPoint interface of the router, facilitating wireless
|
||||
communications. Specifically, it returns the interface configured on port 1, dedicated to establishing and
|
||||
managing wireless network connections. This interface is essential for enabling wireless connectivity,
|
||||
allowing devices within connect to the network wirelessly.
|
||||
|
||||
:return: The WirelessAccessPoint instance representing the wireless connection interface on port 1 of the
|
||||
wireless router.
|
||||
"""
|
||||
return self.network_interface[1]
|
||||
|
||||
@validate_call()
|
||||
def configure_wireless_access_point(
|
||||
self,
|
||||
ip_address: IPV4Address,
|
||||
subnet_mask: IPV4Address,
|
||||
frequency: AirSpaceFrequency = AirSpaceFrequency.WIFI_2_4,
|
||||
):
|
||||
"""
|
||||
Configures a wireless access point (WAP).
|
||||
|
||||
Sets its IP address, subnet mask, and operating frequency. This method ensures the wireless access point is
|
||||
properly set up to manage wireless communication over the specified frequency band.
|
||||
|
||||
The method first disables the WAP to safely apply configuration changes. After configuring the IP settings,
|
||||
it sets the WAP to operate on the specified frequency band and then re-enables the WAP for operation.
|
||||
|
||||
:param ip_address: The IP address to be assigned to the wireless access point.
|
||||
:param subnet_mask: The subnet mask associated with the IP address
|
||||
:param frequency: The operating frequency of the wireless access point, defined by the AirSpaceFrequency
|
||||
enum. This determines the frequency band (e.g., 2.4 GHz or 5 GHz) the access point will use for wireless
|
||||
communication. Default is AirSpaceFrequency.WIFI_2_4.
|
||||
"""
|
||||
self.wireless_access_point.disable() # Temporarily disable the WAP for reconfiguration
|
||||
network_interface = self.network_interface[1]
|
||||
network_interface.ip_address = ip_address
|
||||
network_interface.subnet_mask = subnet_mask
|
||||
self.sys_log.info(f"Configured WAP {network_interface}")
|
||||
self.set_original_state()
|
||||
self.wireless_access_point.frequency = frequency # Set operating frequency
|
||||
self.wireless_access_point.enable() # Re-enable the WAP with new settings
|
||||
|
||||
@property
|
||||
def router_interface(self) -> RouterInterface:
|
||||
"""
|
||||
Retrieves the router interface associated with this wireless router.
|
||||
|
||||
This property provides access to the router interface configured for wired connections. It specifically
|
||||
returns the interface configured on port 2, which is reserved for wired LAN/WAN connections.
|
||||
|
||||
:return: The RouterInterface instance representing the wired LAN/WAN connection on port 2 of the wireless
|
||||
router.
|
||||
"""
|
||||
return self.network_interface[2]
|
||||
|
||||
@validate_call()
|
||||
def configure_router_interface(
|
||||
self,
|
||||
ip_address: IPV4Address,
|
||||
subnet_mask: IPV4Address,
|
||||
):
|
||||
"""
|
||||
Configures a router interface.
|
||||
|
||||
Sets its IP address and subnet mask.
|
||||
|
||||
The method first disables the router interface to safely apply configuration changes. After configuring the IP
|
||||
settings, it re-enables the router interface for operation.
|
||||
|
||||
:param ip_address: The IP address to be assigned to the router interface.
|
||||
:param subnet_mask: The subnet mask associated with the IP address
|
||||
"""
|
||||
self.router_interface.disable() # Temporarily disable the router interface for reconfiguration
|
||||
super().configure_port(port=2, ip_address=ip_address, subnet_mask=subnet_mask) # Set IP configuration
|
||||
self.router_interface.enable() # Re-enable the router interface with new settings
|
||||
|
||||
def configure_port(self, port: int, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]):
|
||||
"""Not Implemented."""
|
||||
raise NotImplementedError(
|
||||
"Please use the 'configure_wireless_access_point' and 'configure_router_interface' functions."
|
||||
)
|
||||
@@ -1,9 +1,9 @@
|
||||
from enum import Enum
|
||||
from ipaddress import IPv4Address
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
@@ -73,9 +73,9 @@ class IPPacket(BaseModel):
|
||||
... )
|
||||
"""
|
||||
|
||||
src_ip_address: IPv4Address
|
||||
src_ip_address: IPV4Address
|
||||
"Source IP address."
|
||||
dst_ip_address: IPv4Address
|
||||
dst_ip_address: IPV4Address
|
||||
"Destination IP address."
|
||||
protocol: IPProtocol = IPProtocol.TCP
|
||||
"IPProtocol."
|
||||
|
||||
Reference in New Issue
Block a user