From 58af58810da74e17b5426da1697f229ce1b8dc49 Mon Sep 17 00:00:00 2001 From: Chris McCarthy Date: Fri, 9 Feb 2024 23:29:06 +0000 Subject: [PATCH] #2205 - Introduced a Firewall class for enhanced network security and control, extending Router functionalities. Updated ACLRule to support IP ranges via wildcard masking for refined traffic filtering. Includes documentation updates. --- CHANGELOG.md | 8 +- docs/source/simulation.rst | 1 + .../network/nodes/firewall.rst | 432 +++++++++++++++ src/primaite/simulator/__init__.py | 4 +- .../hardware/nodes/network/firewall.py | 492 ++++++++++++++++++ .../network/hardware/nodes/network/router.py | 387 ++++++++++---- .../network/transmission/network_layer.py | 6 +- .../network/test_firewall.py | 280 ++++++++++ .../_network/_hardware/nodes/test_acl.py | 330 +++++++++--- 9 files changed, 1759 insertions(+), 181 deletions(-) create mode 100644 docs/source/simulation_components/network/nodes/firewall.rst create mode 100644 src/primaite/simulator/network/hardware/nodes/network/firewall.py create mode 100644 tests/integration_tests/network/test_firewall.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9716fd0e..a18e4d2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,7 +71,12 @@ SessionManager. - Detailed descriptions of the Session Manager and Software Manager functionalities, including their roles in managing sessions, software services, and applications within the simulation. - Documentation for the Packet Capture (PCAP) service and SysLog functionality, highlighting their importance in logging network frames and system events, respectively. - Expanded documentation on network devices such as Routers, Switches, Computers, and Switch Nodes, explaining their specific processing logic and protocol support. - +- **Firewall Node**: Introduced the `Firewall` class extending the functionality of the existing `Router` class. The `Firewall` class incorporates advanced features to scrutinize, direct, and filter traffic between various network zones, guided by predefined security rules and policies. Key functionalities include: + - Access Control Lists (ACLs) for traffic filtering based on IP addresses, protocols, and port numbers. + - Network zone segmentation for managing traffic across external, internal, and DMZ (De-Militarized Zone) networks. + - Interface configuration to establish connectivity and define network parameters for external, internal, and DMZ interfaces. + - Protocol and service management to oversee traffic and enforce security policies. + - Dynamic traffic processing and filtering to ensure network security and integrity. ### Changed - Integrated the RouteTable into the Routers frame processing. @@ -82,6 +87,7 @@ SessionManager. - Standardised the way network interfaces are accessed across all `Node` subclasses (`HostNode`, `Router`, `Switch`) by maintaining a comprehensive `network_interface` attribute. This attribute captures all network interfaces by their port number, streamlining the management and interaction with network interfaces across different types of nodes. - Refactored all tests to utilise new `Node` subclasses (`Computer`, `Server`, `Router`, `Switch`) instead of creating generic `Node` instances and manually adding network interfaces. This change aligns test setups more closely with the intended use cases and hierarchies within the network simulation framework. - Updated all tests to employ the `Network()` class for managing nodes and their connections, ensuring a consistent and structured approach to setting up network topologies in testing scenarios. +- **ACLRule Wildcard Masking**: Updated the `ACLRule` class to support IP ranges using wildcard masking. This enhancement allows for more flexible and granular control over traffic filtering, enabling the specification of broader or more specific IP address ranges in ACL rules. ### Removed diff --git a/docs/source/simulation.rst b/docs/source/simulation.rst index d85a1449..56761517 100644 --- a/docs/source/simulation.rst +++ b/docs/source/simulation.rst @@ -22,6 +22,7 @@ Contents simulation_components/network/nodes/host_node simulation_components/network/nodes/network_node simulation_components/network/nodes/router + simulation_components/network/nodes/firewall simulation_components/network/switch simulation_components/network/network simulation_components/system/internal_frame_processing diff --git a/docs/source/simulation_components/network/nodes/firewall.rst b/docs/source/simulation_components/network/nodes/firewall.rst new file mode 100644 index 00000000..39f804c4 --- /dev/null +++ b/docs/source/simulation_components/network/nodes/firewall.rst @@ -0,0 +1,432 @@ +.. only:: comment + + © Crown-owned copyright 2023, Defence Science and Technology Laboratory UK + +######## +Firewall +######## + +The ``firewall.py`` module is a cornerstone in network security within the PrimAITE simulation, designed to simulate +the functionalities of a firewall in monitoring, controlling, and securing network traffic. + +Firewall Class +-------------- + +The ``Firewall`` class extends the ``Router`` class, incorporating advanced capabilities to scrutinise, direct, +and filter traffic between various network zones, guided by predefined security rules and policies. + +Key Features +============ + + +- **Access Control Lists (ACLs):** Employs ACLs to establish security rules for permitting or denying traffic + based on IP addresses, protocols, and port numbers, offering detailed oversight of network traffic. +- **Network Zone Segmentation:** Facilitates network division into distinct zones, including internal, external, + and DMZ (De-Militarized Zone), each governed by specific inbound and outbound traffic rules. +- **Interface Configuration:** Enables the configuration of network interfaces for connectivity to external, + internal, and DMZ networks, including setting up IP addressing and subnetting. +- **Protocol and Service Management:** Oversees and filters traffic across different protocols and services, + enforcing adherence to established security policies. +- **Dynamic Traffic Processing:** Actively processes incoming and outgoing traffic via relevant ACLs, determining + whether to forward or block based on the evaluation of rules. +- **Logging and Diagnostics:** Integrates with ``SysLog`` for detailed logging of firewall actions, supporting + security monitoring and incident investigation. + +Operations +========== + +- **Rule Definition and Management:** Permits the creation and administration of ACL rules for precise traffic + control, enabling the firewall to serve as an effective guard against unauthorised access. +- **Traffic Forwarding and Filtering:** Assesses network frames against ACL rules to allow or block traffic, + forwarding permitted traffic towards its destination whilst obstructing malicious or unauthorised requests. +- **Interface and Zone Configuration:** Provides mechanisms for configuring and managing network interfaces, + aligning with logical network architecture and security zoning requisites. + +Configuring Interfaces +====================== + +To set up firewall interfaces, allocate IP addresses and subnet masks to the external, internal, and DMZ interfaces +using the respective configuration methods: + +.. code-block:: python + + firewall.configure_external_port(ip_address="10.0.0.1", subnet_mask="255.255.255.0") + firewall.configure_internal_port(ip_address="192.168.1.1", subnet_mask="255.255.255.0") + firewall.configure_dmz_port(ip_address="172.16.0.1", subnet_mask="255.255.255.0") + + +Firewall ACLs +============= + +In the PrimAITE network simulation, six Access Control Lists (ACLs) are crucial for delineating and enforcing +comprehensive network security measures. These ACLs, designated as internal inbound, internal outbound, DMZ inbound, +DMZ outbound, external inbound, and external outbound, each serve a specific role in orchestrating the flow of data +through the network. They allow for meticulous control of traffic entering, exiting, and moving within the network, +ensuring robust protection against unauthorised access and potential cyber threats. By leveraging these ACLs both +individually and collectively, users can simulate a multi-layered security architecture. + +Internal Inbound ACL +^^^^^^^^^^^^^^^^^^^^ + +This ACL controls incoming traffic from the external network and DMZ to the internal network. It's crucial for +preventing unauthorised access to internal resources. By filtering incoming requests, it ensures that only legitimate +and necessary traffic can enter the internal network, protecting sensitive data and systems. + +Internal Outbound ACL +^^^^^^^^^^^^^^^^^^^^^ + +The internal outbound ACL manages traffic leaving the internal network to the external network or DMZ. It can restrict +internal users or systems from accessing potentially harmful external sites or services, mitigate data exfiltration +risks. + +DMZ Inbound ACL +^^^^^^^^^^^^^^^ + +This ACL regulates access to services hosted in the DMZ from the external network and internal network. Since the DMZ +hosts public-facing services like web and email servers, the DMZ inbound ACL is pivotal in allowing necessary access +while blocking malicious or unauthorised attempts, thus serving as a first line of defence. + +DMZ Outbound ACL +^^^^^^^^^^^^^^^^ + +The ACL controls traffic from DMZ to the external network and internal network. It's used to restrict the DMZ services +from initiating unauthorised connections, which is essential for preventing compromised DMZ services from being used +as launchpads for attacks or data exfiltration. + +External Inbound ACL +^^^^^^^^^^^^^^^^^^^^ + +This ACL filters all incoming traffic from the external network towards the internal network or DMZ. It's instrumental +in blocking unwanted or potentially harmful external traffic, ensuring that only traffic conforming to the security +policies is allowed into the network. **This ACL should only be used when the rule applies to both internal and DMZ +networks.** + +External Outbound ACL +^^^^^^^^^^^^^^^^^^^^^ + +This ACL governs traffic leaving the internal network or DMZ to the external network. It plays a critical role in data +loss prevention (DLP) by restricting the types of data and services that internal users and systems can access or +interact with on external networks. **This ACL should only be used when the rule applies to both internal and DMZ +networks.** + +Using ACLs Together +^^^^^^^^^^^^^^^^^^^ + +When these ACLs are used in concert, they create a robust security matrix that controls traffic flow in all directions: +into the internal network, out of the internal network, into the DMZ, out of the DMZ, and between these networks and +the external world. For example, while the external inbound ACL might block all incoming SSH requests to protect both +the internal network and DMZ, the internal outbound ACL could allow SSH access to specific external servers for +management purposes. Simultaneously, the DMZ inbound ACL might permit HTTP and HTTPS traffic to specific servers to +provide access to web services while the DMZ outbound ACL ensures these servers cannot make unauthorised outbound +connections. + +By effectively configuring and managing these ACLs, users can establish and experiment with detailed security policies +that are finely tuned to their simulated network's unique requirements and threat models, achieving granular oversight +over traffic flows. This not only enables secure simulated interactions and data exchanges within PrimAITE environments +but also fortifies the virtual network against unauthorised access and cyber threats, mirroring real-world network +security practices. + + +ACL Configuration Examples +========================== + +The subsequent examples provide detailed illustrations on configuring ACL rules within PrimAITE's firewall setup, +addressing various scenarios that encompass external attempts to access resources not only within the internal network +but also within the DMZ. These examples reflect the firewall's specific port configurations and showcase the +versatility and control that ACLs offer in managing network traffic, ensuring that security policies are precisely +enforced. Each example highlights different aspects of ACL usage, from basic traffic filtering to more complex +scenarios involving specific service access and protection against external threats. + +**Blocking External Traffic to Internal Network** + +To prevent all external traffic from accessing the internal network, with exceptions for approved services: + +.. code-block:: python + + # Default rule to deny all external traffic to the internal network + firewall.internal_inbound_acl.add_rule( + action=ACLAction.DENY, + src_ip_address="0.0.0.0", + src_wildcard_mask="255.255.255.255", + dst_ip_address="192.168.1.0", + dst_wildcard_mask="0.0.0.255", + position=1 + ) + + # Exception rule to allow HTTP traffic from external to internal network + firewall.internal_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + dst_port=Port.HTTP, + dst_ip_address="192.168.1.0", + dst_wildcard_mask="0.0.0.255", + position=2 + ) + +**Allowing External Access to Specific Services in DMZ** + +To enable external traffic to access specific services hosted within the DMZ: + +.. code-block:: python + + # Allow HTTP and HTTPS traffic to the DMZ + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + dst_port=Port.HTTP, + dst_ip_address="172.16.0.0", + dst_wildcard_mask="0.0.0.255", + position=3 + ) + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + dst_port=Port.HTTPS, + dst_ip_address="172.16.0.0", + dst_wildcard_mask="0.0.0.255", + position=4 + ) + +**Edge Case - Permitting External SSH Access to a Specific Internal Server** + +To permit SSH access from a designated external IP to a specific server within the internal network: + +.. code-block:: python + + # Allow SSH from a specific external IP to an internal server + firewall.internal_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + src_ip_address="10.0.0.2", + dst_port=Port.SSH, + dst_ip_address="192.168.1.10", + position=5 + ) + +**Restricting Access to Internal Database Server** + +To limit database server access to selected external IP addresses: + +.. code-block:: python + + # Allow PostgreSQL traffic from an authorized external IP to the internal DB server + firewall.internal_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + src_ip_address="10.0.0.3", + dst_port=Port.POSTGRES_SERVER, + dst_ip_address="192.168.1.20", + position=6 + ) + + # Deny all other PostgreSQL traffic from external sources + firewall.internal_inbound_acl.add_rule( + action=ACLAction.DENY, + protocol=IPProtocol.TCP, + dst_port=Port.POSTGRES_SERVER, + dst_ip_address="192.168.1.0", + dst_wildcard_mask="0.0.0.255", + position=7 + ) + +**Permitting DMZ Web Server Access while Blocking Specific Threats* + +To authorize HTTP/HTTPS access to a DMZ-hosted web server, excluding known malicious IPs: + +.. code-block:: python + + # Deny access from a known malicious IP to any DMZ service + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.DENY, + src_ip_address="10.0.0.4", + dst_ip_address="172.16.0.0", + dst_wildcard_mask="0.0.0.255", + position=8 + ) + + # Allow HTTP/HTTPS traffic to the DMZ web server + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + dst_port=Port.HTTP, + dst_ip_address="172.16.0.2", + position=9 + ) + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + dst_port=Port.HTTPS, + dst_ip_address="172.16.0.2", + position=10 + ) + +**Enabling Internal to DMZ Restricted Access** + +To facilitate restricted access from the internal network to DMZ-hosted services: + +.. code-block:: python + + # Permit specific internal application server HTTPS access to a DMZ-hosted API + firewall.internal_outbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + src_ip_address="192.168.1.30", # Internal application server IP + dst_port=Port.HTTPS, + dst_ip_address="172.16.0.3", # DMZ API server IP + position=11 + ) + + # Deny all other traffic from the internal network to the DMZ + firewall.internal_outbound_acl.add_rule( + action=ACLAction.DENY, + src_ip_address="192.168.1.0", + src_wildcard_mask="0.0.0.255", + dst_ip_address="172.16.0.0", + dst_wildcard_mask="0.0.0.255", + position=12 + ) + + # Corresponding rule in DMZ inbound ACL to allow the traffic from the specific internal server + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + src_ip_address="192.168.1.30", # Ensuring this specific source is allowed + dst_port=Port.HTTPS, + dst_ip_address="172.16.0.3", # DMZ API server IP + position=13 + ) + + # Deny all other internal traffic to the specific DMZ API server + firewall.dmz_inbound_acl.add_rule( + action=ACLAction.DENY, + src_ip_address="192.168.1.0", + src_wildcard_mask="0.0.0.255", + dst_port=Port.HTTPS, + dst_ip_address="172.16.0.3", # DMZ API server IP + position=14 + ) + +**Blocking Unwanted External Access** + +To block all SSH access attempts from the external network: + +.. code-block:: python + + # Deny all SSH traffic from any external source + firewall.external_inbound_acl.add_rule( + action=ACLAction.DENY, + protocol=IPProtocol.TCP, + dst_port=Port.SSH, + position=1 + ) + +**Allowing Specific External Communication** + +To allow the internal network to initiate HTTP connections to the external network: + +.. code-block:: python + + # Permit outgoing HTTP traffic from the internal network to any external destination + firewall.external_outbound_acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + dst_port=Port.HTTP, + position=2 + ) + + +The examples above demonstrate the versatility and power of ACLs in crafting nuanced security policies. By combining +rules that specify permitted and denied traffic, both broadly and narrowly defined, administrators can construct +a firewall policy that safeguards network resources while ensuring necessary access is maintained. + +Show Rules Function +=================== + +The show_rules function in the Firewall class displays the configurations of Access Control Lists (ACLs) within a +network firewall. It presents a comprehensive table detailing the rules that govern the filtering and management of +network traffic. + +**Functionality:** + +This function showcases each rule in an ACL, outlining its: + +- **Index**: The rule's position within the ACL. +- **Action**: Specifies whether to permit or deny matching traffic. +- **Protocol**: The network protocol to which the rule applies. +- **Src IP and Dst IP**: Source and destination IP addresses. +- **Src Wildcard and Dst** Wildcard: Wildcard masks for source and destination IP ranges. +- **Src Port and Dst Port**: Source and destination ports. +- **Hit Count**: The number of times the rule has been matched by traffic. + +Example Output: + +.. code-block:: text + + +---------------------------------------------------------------------------------------------------------------+ + | firewall_1 - External Inbound Access Control List | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | Index | Action | Protocol | Src IP | Src Wildcard | Src Port | Dst IP | Dst Wildcard | Dst Port | Hit Count | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | 22 | PERMIT | ANY | ANY | ANY | 219 (ARP) | ANY | ANY | 219 (ARP) | 1 | + | 23 | PERMIT | ICMP | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + | 24 | PERMIT | ANY | ANY | ANY | ANY | ANY | ANY | ANY | 2 | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + + +---------------------------------------------------------------------------------------------------------------+ + | firewall_1 - External Outbound Access Control List | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | Index | Action | Protocol | Src IP | Src Wildcard | Src Port | Dst IP | Dst Wildcard | Dst Port | Hit Count | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | 22 | PERMIT | ANY | ANY | ANY | 219 (ARP) | ANY | ANY | 219 (ARP) | 0 | + | 23 | PERMIT | ICMP | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + | 24 | PERMIT | ANY | ANY | ANY | ANY | ANY | ANY | ANY | 2 | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + + +---------------------------------------------------------------------------------------------------------------+ + | firewall_1 - Internal Inbound Access Control List | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | Index | Action | Protocol | Src IP | Src Wildcard | Src Port | Dst IP | Dst Wildcard | Dst Port | Hit Count | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | 1 | PERMIT | ANY | ANY | ANY | 123 (NTP) | ANY | ANY | 123 (NTP) | 1 | + | 22 | PERMIT | ANY | ANY | ANY | 219 (ARP) | ANY | ANY | 219 (ARP) | 0 | + | 23 | PERMIT | ICMP | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + | 24 | DENY | ANY | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + + +---------------------------------------------------------------------------------------------------------------+ + | firewall_1 - Internal Outbound Access Control List | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | Index | Action | Protocol | Src IP | Src Wildcard | Src Port | Dst IP | Dst Wildcard | Dst Port | Hit Count | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | 1 | PERMIT | ANY | ANY | ANY | 123 (NTP) | ANY | ANY | 123 (NTP) | 1 | + | 22 | PERMIT | ANY | ANY | ANY | 219 (ARP) | ANY | ANY | 219 (ARP) | 1 | + | 23 | PERMIT | ICMP | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + | 24 | DENY | ANY | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + + +---------------------------------------------------------------------------------------------------------------+ + | firewall_1 - DMZ Inbound Access Control List | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | Index | Action | Protocol | Src IP | Src Wildcard | Src Port | Dst IP | Dst Wildcard | Dst Port | Hit Count | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | 1 | PERMIT | ANY | ANY | ANY | 123 (NTP) | ANY | ANY | 123 (NTP) | 1 | + | 22 | PERMIT | ANY | ANY | ANY | 219 (ARP) | ANY | ANY | 219 (ARP) | 0 | + | 23 | PERMIT | ICMP | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + | 24 | DENY | ANY | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + + +---------------------------------------------------------------------------------------------------------------+ + | firewall_1 - DMZ Outbound Access Control List | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | Index | Action | Protocol | Src IP | Src Wildcard | Src Port | Dst IP | Dst Wildcard | Dst Port | Hit Count | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + | 1 | PERMIT | ANY | ANY | ANY | 123 (NTP) | ANY | ANY | 123 (NTP) | 1 | + | 22 | PERMIT | ANY | ANY | ANY | 219 (ARP) | ANY | ANY | 219 (ARP) | 1 | + | 23 | PERMIT | ICMP | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + | 24 | DENY | ANY | ANY | ANY | ANY | ANY | ANY | ANY | 0 | + +-------+--------+----------+--------+--------------+-----------+--------+--------------+-----------+-----------+ + + +The ``firewall.py`` module within PrimAITE empowers users to accurately model and simulate the pivotal role of +firewalls in network security. It provides detailed command over traffic flow and enforces security policies to safeguard +networked assets. diff --git a/src/primaite/simulator/__init__.py b/src/primaite/simulator/__init__.py index aebd77cf..97bcd57b 100644 --- a/src/primaite/simulator/__init__.py +++ b/src/primaite/simulator/__init__.py @@ -12,8 +12,8 @@ class _SimOutput: self._path: Path = ( _PRIMAITE_ROOT.parent.parent / "simulation_output" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S") ) - self.save_pcap_logs: bool = False - self.save_sys_logs: bool = False + self.save_pcap_logs: bool = True + self.save_sys_logs: bool = True @property def path(self) -> Path: diff --git a/src/primaite/simulator/network/hardware/nodes/network/firewall.py b/src/primaite/simulator/network/hardware/nodes/network/firewall.py new file mode 100644 index 00000000..bccfeab1 --- /dev/null +++ b/src/primaite/simulator/network/hardware/nodes/network/firewall.py @@ -0,0 +1,492 @@ +from typing import Dict, Final, Optional, Union + +from prettytable import MARKDOWN, PrettyTable +from pydantic import validate_call + +from primaite.simulator.network.hardware.nodes.network.router import ( + AccessControlList, + ACLAction, + Router, + RouterInterface, +) +from primaite.simulator.network.transmission.data_link_layer import Frame +from primaite.simulator.system.core.sys_log import SysLog +from primaite.utils.validators import IPV4Address + +EXTERNAL_PORT_ID: Final[int] = 1 +"""The Firewall port ID of the external port.""" +INTERNAL_PORT_ID: Final[int] = 2 +"""The Firewall port ID of the internal port.""" +DMZ_PORT_ID: Final[int] = 3 +"""The Firewall port ID of the DMZ port.""" + + +class Firewall(Router): + """ + A Firewall class that extends the functionality of a Router. + + The Firewall class acts as a network security system that monitors and controls incoming and outgoing + network traffic based on predetermined security rules. It is an intermediary between internal and external + networks (including DMZ - De-Militarized Zone), ensuring that all inbound and outbound traffic complies with + the security policies. + + The Firewall employs Access Control Lists (ACLs) to filter traffic. Both the internal and DMZ ports have both + inbound and outbound ACLs that determine what traffic is allowed to pass. + + In addition to the security functions, the Firewall can also perform some routing functions similar to a Router, + forwarding packets between its interfaces based on the destination IP address. + + Usage: + To utilise the Firewall class, instantiate it with a hostname and optionally specify sys_log for logging. + Configure the internal, external, and DMZ ports with IP addresses and subnet masks. Define ACL rules to + permit or deny traffic based on your security policies. The Firewall will process frames based on these + rules, determining whether to allow or block traffic at each network interface. + + Example: + >>> from primaite.simulator.network.transmission.network_layer import IPProtocol + >>> from primaite.simulator.network.transmission.transport_layer import Port + >>> firewall = Firewall(hostname="Firewall1") + >>> firewall.configure_internal_port(ip_address="192.168.1.1", subnet_mask="255.255.255.0") + >>> firewall.configure_external_port(ip_address="10.0.0.1", subnet_mask="255.255.255.0") + >>> firewall.configure_dmz_port(ip_address="172.16.0.1", subnet_mask="255.255.255.0") + >>> # Permit HTTP traffic to the DMZ + >>> firewall.dmz_inbound_acl.add_rule( + ... action=ACLAction.PERMIT, + ... protocol=IPProtocol.TCP, + ... dst_port=Port.HTTP, + ... src_ip_address="0.0.0.0", + ... src_wildcard_mask="0.0.0.0", + ... dst_ip_address="172.16.0.0", + ... dst_wildcard_mask="0.0.0.255" + ... ) + + :ivar str hostname: The Firewall hostname. + """ + + internal_inbound_acl: Optional[AccessControlList] = None + """Access Control List for managing entering the internal network.""" + + internal_outbound_acl: Optional[AccessControlList] = None + """Access Control List for managing traffic leaving the internal network.""" + + dmz_inbound_acl: Optional[AccessControlList] = None + """Access Control List for managing traffic entering the DMZ.""" + + dmz_outbound_acl: Optional[AccessControlList] = None + """Access Control List for managing traffic leaving the DMZ.""" + + external_inbound_acl: Optional[AccessControlList] = None + """Access Control List for managing traffic entering from an external network.""" + + external_outbound_acl: Optional[AccessControlList] = None + """Access Control List for managing traffic leaving towards an external network.""" + + def __init__(self, hostname: str, **kwargs): + if not kwargs.get("sys_log"): + kwargs["sys_log"] = SysLog(hostname) + + super().__init__(hostname=hostname, num_ports=3, **kwargs) + + # Initialise ACLs for internal and dmz interfaces with a default DENY policy + self.internal_inbound_acl = AccessControlList( + sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Inbound" + ) + self.internal_outbound_acl = AccessControlList( + sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Outbound" + ) + self.dmz_inbound_acl = AccessControlList( + sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Inbound" + ) + self.dmz_outbound_acl = AccessControlList( + sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Outbound" + ) + + # external ACLs should have a default PERMIT policy + self.external_inbound_acl = AccessControlList( + sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Inbound" + ) + self.external_outbound_acl = AccessControlList( + sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Outbound" + ) + + self.set_original_state() + + def set_original_state(self): + """Set the original state for the Firewall.""" + super().set_original_state() + vals_to_include = { + "internal_port", + "external_port", + "dmz_port", + "internal_inbound_acl", + "internal_outbound_acl", + "dmz_inbound_acl", + "dmz_outbound_acl", + } + self._original_state.update(self.model_dump(include=vals_to_include)) + + def describe_state(self) -> Dict: + """ + Describes the current state of the Firewall. + + :return: A dictionary representing the current state. + """ + state = super().describe_state() + + state.update( + { + "internal_port": self.internal_port.describe_state(), + "external_port": self.external_port.describe_state(), + "dmz_port": self.dmz_port.describe_state(), + "internal_inbound_acl": self.internal_inbound_acl.describe_state(), + "internal_outbound_acl": self.internal_outbound_acl.describe_state(), + "dmz_inbound_acl": self.dmz_inbound_acl.describe_state(), + "dmz_outbound_acl": self.dmz_outbound_acl.describe_state(), + } + ) + + return state + + def show(self, markdown: bool = False): + """ + Displays the current configuration of the firewall's network interfaces in a table format. + + The table includes information about each port (External, Internal, DMZ), their MAC addresses, IP + configurations, link speeds, and operational status. The output can be formatted as Markdown if specified. + + :param markdown: If True, formats the output table in Markdown style. Useful for documentation or reporting + purposes within Markdown-compatible platforms. + """ + table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.hostname} Network Interfaces" + ports = {"External": self.external_port, "Internal": self.internal_port, "DMZ": self.dmz_port} + for port, network_interface in ports.items(): + table.add_row( + [ + port, + network_interface.mac_address, + f"{network_interface.ip_address}/{network_interface.ip_network.prefixlen}", + network_interface.speed, + "Enabled" if network_interface.enabled else "Disabled", + ] + ) + print(table) + + def show_rules(self, external: bool = True, internal: bool = True, dmz: bool = True, markdown: bool = False): + """ + Prints the configured ACL rules for each specified network zone of the firewall. + + This method allows selective viewing of ACL rules applied to external, internal, and DMZ interfaces, providing + a clear overview of the firewall's current traffic filtering policies. Each section can be independently + toggled. + + :param external: If True, shows ACL rules for external interfaces. + :param internal: If True, shows ACL rules for internal interfaces. + :param dmz: If True, shows ACL rules for DMZ interfaces. + :param markdown: If True, formats the output in Markdown, enhancing readability in Markdown-compatible viewers. + """ + print(f"{self.hostname} Firewall Rules") + print() + if external: + self.external_inbound_acl.show(markdown) + print() + self.external_outbound_acl.show(markdown) + print() + if internal: + self.internal_inbound_acl.show(markdown) + print() + self.internal_outbound_acl.show(markdown) + print() + if dmz: + self.dmz_inbound_acl.show(markdown) + print() + self.dmz_outbound_acl.show(markdown) + print() + + def receive_frame(self, frame: Frame, from_network_interface: RouterInterface): + """ + Receive a frame and process it. + + Acts as the primary entry point for all network frames arriving at the Firewall, determining the flow of + traffic based on the source network interface controller (NIC) and applying the appropriate Access Control + List (ACL) rules. + + This method categorizes the incoming traffic into three main pathways based on the source NIC: external inbound, + internal outbound, and DMZ (De-Militarized Zone) outbound. It plays a crucial role in enforcing the firewall's + security policies by directing each frame to the corresponding processing method that evaluates it against + specific ACL rules. + + Based on the originating NIC: + - Frames from the external port are processed as external inbound traffic, potentially destined for either the + DMZ or the internal network. + - Frames from the internal port are treated as internal outbound traffic, aimed at reaching the external + network or a service within the DMZ. + - Frames from the DMZ port are handled as DMZ outbound traffic, with potential destinations including the + internal network or the external network. + + :param frame: The network frame to be processed. + :param from_network_interface: The network interface controller from which the frame is coming. Used to + determine the direction of the traffic (inbound or outbound) and the zone (external, internal, + DMZ) it belongs to. + """ + # If the frame comes from the external port, it's considered as external inbound traffic + if from_network_interface == self.external_port: + self._process_external_inbound_frame(frame, from_network_interface) + return + # If the frame comes from the internal port, it's considered as internal outbound traffic + elif from_network_interface == self.internal_port: + self._process_internal_outbound_frame(frame, from_network_interface) + return + # If the frame comes from the DMZ port, it's considered as DMZ outbound traffic + elif from_network_interface == self.dmz_port: + self._process_dmz_outbound_frame(frame, from_network_interface) + return + + def _process_external_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None: + """ + Process frames arriving from the external network. + + Determines the path for frames based on their destination IP addresses and ACL rules for the external inbound + interface. Frames destined for the DMZ or internal network are forwarded accordingly, if allowed by the ACL. + + If a frame is permitted by the ACL, it is either passed to the session manager (if applicable) or forwarded to + the appropriate network zone (DMZ/internal). Denied frames are logged and dropped. + + :param frame: The frame to be processed, containing network layer and transport layer information. + :param from_network_interface: The interface on the firewall through which the frame was received. + """ + # check if External Inbound ACL Rules permit frame + permitted, rule = self.external_inbound_acl.is_permitted(frame) + if not permitted: + self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}") + return + else: + self.software_manager.arp.add_arp_cache_entry( + ip_address=frame.ip.src_ip_address, + mac_address=frame.ethernet.src_mac_addr, + network_interface=from_network_interface, + ) + + if self.check_send_frame_to_session_manager(frame): + # Port is open on this Router so pass Frame up to session manager first + self.session_manager.receive_frame(frame, from_network_interface) + else: + # If the destination IP is within the DMZ network, process the frame as DMZ inbound + if frame.ip.dst_ip_address in self.dmz_port.ip_network: + self._process_dmz_inbound_frame(frame, from_network_interface) + else: + # Otherwise, process the frame as internal inbound + self._process_internal_inbound_frame(frame, from_network_interface) + + def _process_external_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None: + """ + Process frames that are outbound towards the external network. + + :param frame: The frame to be processed. + :param from_network_interface: The network interface controller from which the frame is coming. + :param re_attempt: Indicates if the processing is a re-attempt, defaults to False. + """ + # check if External Outbound ACL Rules permit frame + permitted, rule = self.external_outbound_acl.is_permitted(frame=frame) + if not permitted: + self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}") + return + + self.process_frame(frame=frame, from_network_interface=from_network_interface) + + def _process_internal_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None: + """ + Process frames that are inbound towards the internal LAN. + + This method is responsible for handling frames coming from either the external network or the DMZ towards + the internal LAN. It checks the frames against the internal inbound ACL to decide whether to allow or deny + the traffic, and take appropriate actions. + + :param frame: The frame to be processed. + :param from_network_interface: The network interface controller from which the frame is coming. + :param re_attempt: Indicates if the processing is a re-attempt, defaults to False. + """ + # check if Internal Inbound ACL Rules permit frame + permitted, rule = self.internal_inbound_acl.is_permitted(frame=frame) + if not permitted: + self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}") + return + + self.process_frame(frame=frame, from_network_interface=from_network_interface) + + def _process_internal_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None: + """ + Process frames that are outbound from the internal network. + + This method handles frames that are leaving the internal network. Depending on the destination IP address, + the frame may be forwarded to the DMZ or to the external network. + + :param frame: The frame to be processed. + :param from_network_interface: The network interface controller from which the frame is coming. + :param re_attempt: Indicates if the processing is a re-attempt, defaults to False. + """ + permitted, rule = self.internal_outbound_acl.is_permitted(frame) + if not permitted: + self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}") + return + else: + self.software_manager.arp.add_arp_cache_entry( + ip_address=frame.ip.src_ip_address, + mac_address=frame.ethernet.src_mac_addr, + network_interface=from_network_interface, + ) + + if self.check_send_frame_to_session_manager(frame): + # Port is open on this Router so pass Frame up to session manager first + self.session_manager.receive_frame(frame, from_network_interface) + else: + # If the destination IP is within the DMZ network, process the frame as DMZ inbound + if frame.ip.dst_ip_address in self.dmz_port.ip_network: + self._process_dmz_inbound_frame(frame, from_network_interface) + else: + # If the destination IP is not within the DMZ network, process the frame as external outbound + self._process_external_outbound_frame(frame, from_network_interface) + + def _process_dmz_inbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None: + """ + Process frames that are inbound from the DMZ. + + This method is responsible for handling frames coming from either the external network or the internal LAN + towards the DMZ. It checks the frames against the DMZ inbound ACL to decide whether to allow or deny the + traffic, and take appropriate actions. + + :param frame: The frame to be processed. + :param from_network_interface: The network interface controller from which the frame is coming. + :param re_attempt: Indicates if the processing is a re-attempt, defaults to False. + """ + # check if DMZ Inbound ACL Rules permit frame + permitted, rule = self.dmz_inbound_acl.is_permitted(frame=frame) + if not permitted: + self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}") + return + + self.process_frame(frame=frame, from_network_interface=from_network_interface) + + def _process_dmz_outbound_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None: + """ + Process frames that are outbound from the DMZ. + + This method handles frames originating from the DMZ and determines their appropriate path based on the + destination IP address. It involves checking the DMZ outbound ACL, consulting the ARP cache and the routing + table to find the correct outbound NIC, and then forwarding the frame to either the internal network or the + external network. + + :param frame: The frame to be processed. + :param from_network_interface: The network interface controller from which the frame is coming. + :param re_attempt: Indicates if the processing is a re-attempt, defaults to False. + """ + permitted, rule = self.dmz_outbound_acl.is_permitted(frame) + if not permitted: + self.sys_log.info(f"Frame blocked at interface {from_network_interface} by rule {rule}") + return + else: + self.software_manager.arp.add_arp_cache_entry( + ip_address=frame.ip.src_ip_address, + mac_address=frame.ethernet.src_mac_addr, + network_interface=from_network_interface, + ) + + if self.check_send_frame_to_session_manager(frame): + # Port is open on this Router so pass Frame up to session manager first + self.session_manager.receive_frame(frame, from_network_interface) + else: + # Attempt to get the outbound NIC from the ARP cache using the destination IP address + outbound_nic = self.software_manager.arp.get_arp_cache_network_interface(frame.ip.dst_ip_address) + + # If outbound NIC is not found in the ARP cache, consult the routing table to find the best route + if not outbound_nic: + route = self.route_table.find_best_route(frame.ip.dst_ip_address) + if route: + # If a route is found, get the corresponding outbound NIC from the ARP cache using the next-hop IP + # address + outbound_nic = self.software_manager.arp.get_arp_cache_network_interface(route.next_hop_ip_address) + + # If an outbound NIC is determined + if outbound_nic: + if outbound_nic == self.external_port: + # If the outbound NIC is the external port, check the frame against the DMZ outbound ACL and + # process it as an external outbound frame + self._process_external_outbound_frame(frame, from_network_interface) + return + elif outbound_nic == self.internal_port: + # If the outbound NIC is the internal port, check the frame against the DMZ outbound ACL and + # process it as an internal inbound frame + self._process_internal_inbound_frame(frame, from_network_interface) + return + # TODO: What to do here? Destination unreachable? Send ICMP back? + return + + @property + def external_port(self) -> RouterInterface: + """ + The external port of the firewall. + + :return: The external port connecting the firewall to the external network. + """ + return self.network_interface[EXTERNAL_PORT_ID] + + @validate_call() + def configure_external_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]): + """ + Configure the external port with an IP address and a subnet mask. + + Enables the port once configured. + + :param ip_address: The IP address to assign to the external port. + :param subnet_mask: The subnet mask to assign to the external port. + """ + # Configure the external port with the specified IP address and subnet mask + self.configure_port(EXTERNAL_PORT_ID, ip_address, subnet_mask) + self.external_port.enable() + + @property + def internal_port(self) -> RouterInterface: + """ + The internal port of the firewall. + + :return: The external port connecting the firewall to the internal LAN. + """ + return self.network_interface[INTERNAL_PORT_ID] + + @validate_call() + def configure_internal_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]): + """ + Configure the internal port with an IP address and a subnet mask. + + Enables the port once configured. + + :param ip_address: The IP address to assign to the internal port. + :param subnet_mask: The subnet mask to assign to the internal port. + """ + self.configure_port(INTERNAL_PORT_ID, ip_address, subnet_mask) + self.internal_port.enable() + + @property + def dmz_port(self) -> RouterInterface: + """ + The DMZ port of the firewall. + + :return: The external port connecting the firewall to the DMZ. + """ + return self.network_interface[DMZ_PORT_ID] + + @validate_call() + def configure_dmz_port(self, ip_address: Union[IPV4Address, str], subnet_mask: Union[IPV4Address, str]): + """ + Configure the DMZ port with an IP address and a subnet mask. + + Enables the port once configured. + + :param ip_address: The IP address to assign to the DMZ port. + :param subnet_mask: The subnet mask to assign to the DMZ port. + """ + self.configure_port(DMZ_PORT_ID, ip_address, subnet_mask) + self.dmz_port.enable() diff --git a/src/primaite/simulator/network/hardware/nodes/network/router.py b/src/primaite/simulator/network/hardware/nodes/network/router.py index 40cbc16d..0ad64d18 100644 --- a/src/primaite/simulator/network/hardware/nodes/network/router.py +++ b/src/primaite/simulator/network/hardware/nodes/network/router.py @@ -6,6 +6,7 @@ from ipaddress import IPv4Address, IPv4Network from typing import Any, Dict, List, Optional, Tuple, Union from prettytable import MARKDOWN, PrettyTable +from pydantic import validate_call from primaite.simulator.core import RequestManager, RequestType, SimComponent from primaite.simulator.network.hardware.base import IPWiredNetworkInterface @@ -19,6 +20,43 @@ from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.system.services.arp.arp import ARP from primaite.simulator.system.services.icmp.icmp import ICMP +from primaite.utils.validators import IPV4Address + + +@validate_call() +def ip_matches_masked_range(ip_to_check: IPV4Address, base_ip: IPV4Address, wildcard_mask: IPV4Address) -> bool: + """ + Determine if a given IP address matches a range defined by a base IP address and a wildcard mask. + + The wildcard mask specifies which bits in the IP address should be ignored (1) and which bits must match (0). + + The function applies the wildcard mask to both the base IP and the IP address to check by first negating the + wildcard mask and then performing a bitwise AND operation. This process effectively masks out the bits indicated + by the wildcard mask. If the resulting masked IP addresses are equal, it means the IP address to check falls within + the range defined by the base IP and wildcard mask. + + :param IPv4Address ip_to_check: The IP address to be checked. + :param IPv4Address base_ip: The base IP address defining the start of the range. + :param IPv4Address wildcard_mask: The wildcard mask specifying which bits to ignore. + :return: A boolean value indicating whether the IP address matches the masked range. + :rtype: bool + + Example usage: + >>> ip_matches_masked_range(ip_to_check="192.168.10.10", base_ip="192.168.1.1", wildcard_mask="0.0.255.255") + False + """ + # Convert the IP addresses from IPv4Address objects to integer representations for bitwise operations + base_ip_int = int(base_ip) + ip_to_check_int = int(ip_to_check) + wildcard_int = int(wildcard_mask) + + # Negate the wildcard mask and apply it to both the base IP and the IP to check using bitwise AND + # This step masks out the bits to be ignored according to the wildcard mask + masked_base_ip = base_ip_int & ~wildcard_int + masked_ip_to_check = ip_to_check_int & ~wildcard_int + + # Compare the masked IP addresses to determine if they match within the masked range + return masked_base_ip == masked_ip_to_check class ACLAction(Enum): @@ -30,22 +68,62 @@ class ACLAction(Enum): class ACLRule(SimComponent): """ - Represents an Access Control List (ACL) rule. + Represents an Access Control List (ACL) rule within a network device. - :ivar ACLAction action: Action to be performed (Permit/Deny). Default is DENY. - :ivar Optional[IPProtocol] protocol: Network protocol. Default is None. - :ivar Optional[IPv4Address] src_ip_address: Source IP address. Default is None. - :ivar Optional[Port] src_port: Source port number. Default is None. - :ivar Optional[IPv4Address] dst_ip_address: Destination IP address. Default is None. - :ivar Optional[Port] dst_port: Destination port number. Default is None. + Enables fine-grained control over network traffic based on specified criteria such as IP addresses, protocols, + and ports. ACL rules can be configured to permit or deny traffic, providing a powerful mechanism for enforcing + security policies and traffic flow. + + ACL rules support specifying exact match conditions, ranges of IP addresses using wildcard masks, and + protocol types. This flexibility allows for complex traffic filtering scenarios, from blocking or allowing + specific types of traffic to entire subnets. + + **Usage:** + + - **Dedicated IP Addresses**: To match traffic from or to a specific IP address, set the `src_ip_address` + and/or `dst_ip_address` without a wildcard mask. This is useful for rules that apply to individual hosts. + + - **IP Ranges with Wildcard Masks**: For rules that apply to a range of IP addresses, use the `src_wildcard_mask` + and/or `dst_wildcard_mask` in conjunction with the base IP address. Wildcard masks are a way to specify which + bits of the IP address should be matched exactly and which bits can vary. For example, a wildcard mask of + `0.0.0.255` applied to a base address of `192.168.1.0` allows for any address from `192.168.1.0` to + `192.168.1.255`. + + - **Allowing All IP Traffic**: To mimic the Cisco ACL rule that permits all IP traffic from a specific range, + you may use wildcard masks with the rule action set to `PERMIT`. If your implementation includes an `ALL` + option in the `IPProtocol` enum, use it to allow all protocols; otherwise, consider the rule without a + specified protocol to apply to all IP traffic. + + + The combination of these attributes allows for the creation of granular rules to control traffic flow + effectively, enhancing network security and management. + + + :ivar ACLAction action: Specifies whether to `PERMIT` or `DENY` the traffic that matches the rule conditions. + The default action is `DENY`. + :ivar Optional[IPProtocol] protocol: The network protocol (e.g., TCP, UDP, ICMP) to match. If `None`, the rule + applies to all protocols. + :ivar Optional[IPv4Address] src_ip_address: The source IP address to match. If combined with `src_wildcard_mask`, + it specifies the start of an IP range. + :ivar Optional[IPv4Address] src_wildcard_mask: The wildcard mask for the source IP address, defining the range + of addresses to match. + :ivar Optional[IPv4Address] dst_ip_address: The destination IP address to match. If combined with + `dst_wildcard_mask`, it specifies the start of an IP range. + :ivar Optional[IPv4Address] dst_wildcard_mask: The wildcard mask for the destination IP address, defining the + range of addresses to match. + :ivar Optional[Port] src_port: The source port number to match. Relevant for TCP/UDP protocols. + :ivar Optional[Port] dst_port: The destination port number to match. Relevant for TCP/UDP protocols. """ action: ACLAction = ACLAction.DENY protocol: Optional[IPProtocol] = None - src_ip_address: Optional[IPv4Address] = None + src_ip_address: Optional[IPV4Address] = None + src_wildcard_mask: Optional[IPV4Address] = None + dst_ip_address: Optional[IPV4Address] = None + dst_wildcard_mask: Optional[IPV4Address] = None src_port: Optional[Port] = None - dst_ip_address: Optional[IPv4Address] = None dst_port: Optional[Port] = None + hit_count: int = 0 def __str__(self) -> str: rule_strings = [] @@ -76,24 +154,132 @@ class ACLRule(SimComponent): state["src_port"] = self.src_port.name if self.src_port else None state["dst_ip_address"] = str(self.dst_ip_address) if self.dst_ip_address else None state["dst_port"] = self.dst_port.name if self.dst_port else None + state["hit_count"] = self.hit_count return state + def permit_frame_check(self, frame: Frame) -> bool: + """ + Evaluates whether a given network frame should be permitted or denied based on this ACL rule. + + This method checks the frame against the ACL rule's criteria, including protocol, source and destination IP + addresses (with support for wildcard masking), and source and destination ports. The method assumes that an + unspecified (None) criterion implies a match for any value in that category. For IP addresses, wildcard masking + can be used to specify ranges of addresses that match the rule. + + The method follows these steps to determine if a frame is permitted: + + 1. Check if the frame's protocol matches the ACL rule's protocol. + 2. For source and destination IP addresses: + 1. If a wildcard mask is defined, check if the frame's IP address is within the range specified by the base + IP address and the wildcard mask. + 2. If no wildcard mask is defined, directly compare the frame's IP address to the one specified in the rule. + 3. Check if the frame's source and destination ports match those specified in the rule. + 4. The frame is permitted if it matches all specified criteria and the rule's action is PERMIT. Conversely, it + is not permitted if any criterion does not match or if the rule's action is DENY. + + :param frame (Frame): The network frame to be evaluated. + :return: True if the frame is permitted by this ACL rule, False otherwise. + """ + protocol_matches = self.protocol == frame.ip.protocol if self.protocol else True + + src_ip_matches = self.src_ip_address is None # Assume match if no specific src IP is defined + if self.src_ip_address: + if self.src_wildcard_mask: + # If a src wildcard mask is provided, use it to check the range + src_ip_matches = ip_matches_masked_range( + ip_to_check=frame.ip.src_ip_address, + base_ip=self.src_ip_address, + wildcard_mask=self.src_wildcard_mask, + ) + else: + # Direct comparison if no wildcard mask is defined + src_ip_matches = frame.ip.src_ip_address == self.src_ip_address + + dst_ip_matches = self.dst_ip_address is None # Assume match if no specific dst IP is defined + if self.dst_ip_address: + if self.dst_wildcard_mask: + # If a dst wildcard mask is provided, use it to check the range + dst_ip_matches = ip_matches_masked_range( + ip_to_check=frame.ip.dst_ip_address, + base_ip=self.dst_ip_address, + wildcard_mask=self.dst_wildcard_mask, + ) + else: + # Direct comparison if no wildcard mask is defined + dst_ip_matches = frame.ip.dst_ip_address == self.dst_ip_address + + src_port = None + dst_port = None + if frame.tcp: + src_port = frame.tcp.src_port + dst_port = frame.tcp.dst_port + elif frame.udp: + src_port = frame.udp.src_port + dst_port = frame.udp.dst_port + + src_port_matches = self.src_port == src_port if self.src_port else True + dst_port_matches = self.dst_port == dst_port if self.dst_port else True + + # The frame is permitted if all conditions are met + if protocol_matches and src_ip_matches and dst_ip_matches and src_port_matches and dst_port_matches: + return self.action == ACLAction.PERMIT + else: + # If any condition is not met, the decision depends on the rule action + return False + class AccessControlList(SimComponent): """ Manages a list of ACLRules to filter network traffic. - :ivar SysLog sys_log: System logging instance. - :ivar ACLAction implicit_action: Default action for rules. - :ivar ACLRule implicit_rule: Implicit ACL rule, created based on implicit_action. - :ivar int max_acl_rules: Maximum number of ACL rules that can be added. Default is 25. - :ivar List[Optional[ACLRule]] _acl: A list containing the ACL rules. + Manages a list of ACLRule instances to filter network traffic based on predefined criteria. This class + provides functionalities to add, remove, and evaluate ACL rules, thereby controlling the flow of traffic + through a network device. + + ACL rules can specify conditions based on source and destination IP addresses, IP protocols (TCP, UDP, ICMP), + and port numbers. Rules can be configured to permit or deny traffic that matches these conditions, offering + granular control over network security policies. + + Usage: + - **Dedicated IP Addresses**: Directly specify the source and/or destination IP addresses in an ACL rule to + match traffic to or from specific hosts. + - **IP Ranges with Wildcard Masks**: Use wildcard masks along with base IP addresses to define ranges of IP + addresses that an ACL rule applies to. This is useful for specifying subnets or ranges of IP addresses. + - **Allowing All IP Traffic**: To mimic a Cisco-style ACL rule that allows all IP traffic from a specified + range, use the wildcard mask in conjunction with a permit action. If your system supports an `ALL` option + for the IP protocol, this can be used to allow all types of IP traffic; otherwise, the absence of a + specified protocol can be interpreted to mean all protocols. + + Methods include functionalities to add and remove rules, reset to default configurations, and evaluate + whether specific frames are permitted or denied based on the current set of rules. The class also provides + utility functions to describe the current state and display the rules in a human-readable format. + + Example: + >>> # To add a rule that permits all TCP traffic from the subnet 192.168.1.0/24 to 192.168.2.0/24: + >>> acl = AccessControlList() + >>> acl.add_rule( + ... action=ACLAction.PERMIT, + ... protocol=IPProtocol.TCP, + ... src_ip_address="192.168.1.0", + ... src_wildcard_mask="0.0.0.255", + ... dst_ip_address="192.168.2.0", + ... dst_wildcard_mask="0.0.0.255" + ...) + + This example demonstrates adding a rule with specific source and destination IP ranges, using wildcard masks + to allow a broad range of traffic while maintaining control over the flow of data for security and + management purposes. + + :ivar ACLAction implicit_action: The default action (permit or deny) applied when no other rule matches. + Typically set to deny to follow the principle of least privilege. + :ivar int max_acl_rules: The maximum number of ACL rules that can be added to the list. Defaults to 25. """ sys_log: SysLog implicit_action: ACLAction implicit_rule: ACLRule max_acl_rules: int = 25 + name: str _acl: List[Optional[ACLRule]] = [None] * 24 _default_config: Dict[int, dict] = {} """Config dict describing how the ACL list should look at episode start""" @@ -210,13 +396,16 @@ class AccessControlList(SimComponent): """ return len([rule for rule in self._acl if rule is not None]) + @validate_call() def add_rule( self, - action: ACLAction, + action: ACLAction = ACLAction.DENY, protocol: Optional[IPProtocol] = None, - src_ip_address: Optional[Union[str, IPv4Address]] = None, + src_ip_address: Optional[IPV4Address] = None, + src_wildcard_mask: Optional[IPV4Address] = None, + dst_ip_address: Optional[IPV4Address] = None, + dst_wildcard_mask: Optional[IPV4Address] = None, src_port: Optional[Port] = None, - dst_ip_address: Optional[Union[str, IPv4Address]] = None, dst_port: Optional[Port] = None, position: int = 0, ) -> None: @@ -224,25 +413,25 @@ class AccessControlList(SimComponent): Add a new ACL rule. :param ACLAction action: Action to be performed (Permit/Deny). - :param Optional[IPProtocol] protocol: Network protocol. - :param Optional[Union[str, IPv4Address]] src_ip_address: Source IP address. - :param Optional[Port] src_port: Source port number. - :param Optional[Union[str, IPv4Address]] dst_ip_address: Destination IP address. - :param Optional[Port] dst_port: Destination port number. - :param int position: Position in the ACL list to insert the rule. + :param protocol: Network protocol. Optional, default is None. + :param src_ip_address: Source IP address. Optional, default is None. + :param src_wildcard_mask: Source IP wildcard mask. Optional, default is None. + :param src_port: Source port number. Optional, default is None. + :param dst_ip_address: Destination IP address. Optional, default is None. + :param dst_wildcard_mask: Destination IP wildcard mask. Optional, default is None. + :param dst_port: Destination port number. Optional, default is None. + :param int position: Position in the ACL list to insert the rule. Optional, default is 1. :raises ValueError: When the position is out of bounds. """ - if isinstance(src_ip_address, str): - src_ip_address = IPv4Address(src_ip_address) - if isinstance(dst_ip_address, str): - dst_ip_address = IPv4Address(dst_ip_address) if 0 <= position < self.max_acl_rules: if self._acl[position]: self.sys_log.info(f"Overwriting ACL rule at position {position}") self._acl[position] = ACLRule( action=action, src_ip_address=src_ip_address, + src_wildcard_mask=src_wildcard_mask, dst_ip_address=dst_ip_address, + dst_wildcard_mask=dst_wildcard_mask, protocol=protocol, src_port=src_port, dst_port=dst_port, @@ -264,43 +453,25 @@ class AccessControlList(SimComponent): else: raise ValueError(f"Cannot remove ACL rule, position {position} is out of bounds.") - def is_permitted( - self, - protocol: IPProtocol, - src_ip_address: Union[str, IPv4Address], - src_port: Optional[Port], - dst_ip_address: Union[str, IPv4Address], - dst_port: Optional[Port], - ) -> Tuple[bool, Optional[Union[str, ACLRule]]]: - """ - Check if a packet with the given properties is permitted through the ACL. - - :param protocol: The protocol of the packet. - :param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address. - :param src_port: Source port of the packet. Optional. - :param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address. - :param dst_port: Destination port of the packet. Optional. - :return: A tuple with a boolean indicating if the packet is permitted and an optional rule or implicit action - string. - """ - if not isinstance(src_ip_address, IPv4Address): - src_ip_address = IPv4Address(src_ip_address) - if not isinstance(dst_ip_address, IPv4Address): - dst_ip_address = IPv4Address(dst_ip_address) - for rule in self._acl: - if not rule: + def is_permitted(self, frame: Frame) -> Tuple[bool, ACLRule]: + """Check if a packet with the given properties is permitted through the ACL.""" + permitted = False + rule: ACLRule = None + for _rule in self._acl: + if not _rule: continue - if ( - (rule.src_ip_address == src_ip_address or rule.src_ip_address is None) - and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None) - and (rule.protocol == protocol or rule.protocol is None) - and (rule.src_port == src_port or rule.src_port is None) - and (rule.dst_port == dst_port or rule.dst_port is None) - ): - return rule.action == ACLAction.PERMIT, rule + if _rule.permit_frame_check(frame): + permitted = True + rule = _rule + break + if not rule: + permitted = self.implicit_action == ACLAction.PERMIT + rule = self.implicit_rule - return self.implicit_action == ACLAction.PERMIT, f"Implicit {self.implicit_action.name}" + rule.hit_count += 1 + + return permitted, rule def get_relevant_rules( self, @@ -346,11 +517,25 @@ class AccessControlList(SimComponent): :param markdown: Whether to display the table in Markdown format. Defaults to False. """ - table = PrettyTable(["Index", "Action", "Protocol", "Src IP", "Src Port", "Dst IP", "Dst Port"]) + table = PrettyTable( + [ + "Index", + "Action", + "Protocol", + "Src IP", + "Src Wildcard", + "Src Port", + "Dst IP", + "Dst Wildcard", + "Dst Port", + "Hit Count", + ] + ) if markdown: table.set_style(MARKDOWN) table.align = "l" - table.title = f"{self.sys_log.hostname} Access Control List" + + table.title = f"{self.name} Access Control List" for index, rule in enumerate(self.acl + [self.implicit_rule]): if rule: table.add_row( @@ -359,22 +544,16 @@ class AccessControlList(SimComponent): rule.action.name if rule.action else "ANY", rule.protocol.name if rule.protocol else "ANY", rule.src_ip_address if rule.src_ip_address else "ANY", + rule.src_wildcard_mask if rule.src_wildcard_mask else "ANY", f"{rule.src_port.value} ({rule.src_port.name})" if rule.src_port else "ANY", rule.dst_ip_address if rule.dst_ip_address else "ANY", + rule.dst_wildcard_mask if rule.dst_wildcard_mask else "ANY", f"{rule.dst_port.value} ({rule.dst_port.name})" if rule.dst_port else "ANY", + rule.hit_count, ] ) print(table) - @property - def num_rules(self) -> int: - """ - Get the number of rules in the ACL. - - :return: The number of rules in the ACL. - """ - return len([rule for rule in self._acl if rule is not None]) - class RouteEntry(SimComponent): """ @@ -880,7 +1059,7 @@ class Router(NetworkNode): if not kwargs.get("sys_log"): kwargs["sys_log"] = SysLog(hostname) if not kwargs.get("acl"): - kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY) + kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=hostname) if not kwargs.get("route_table"): kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"]) super().__init__(hostname=hostname, num_ports=num_ports, **kwargs) @@ -1008,6 +1187,36 @@ class Router(NetworkNode): state["acl"] = self.acl.describe_state() return state + def check_send_frame_to_session_manager(self, frame: Frame) -> bool: + """ + Determines whether a given network frame should be forwarded to the session manager. + + his function evaluates whether the destination IP address of the frame corresponds to one of the router's + interface IP addresses. If so, it then checks if the frame is an ICMP packet or if the destination port matches + any of the ports that the router's software manager identifies as open. If either condition is met, the frame + is considered for further processing by the session manager, implying potential application-level handling or + response generation. + + :param frame: The network frame to be evaluated. + + :return: A boolean value indicating whether the frame should be sent to the session manager. ``True`` if the + frame's destination IP matches the router's interface and is directed to an open port or is an ICMP packet, + otherwise, ``False``. + """ + dst_ip_address = frame.ip.dst_ip_address + dst_port = None + if frame.ip.protocol == IPProtocol.TCP: + dst_port = frame.tcp.dst_port + elif frame.ip.protocol == IPProtocol.UDP: + dst_port = frame.udp.dst_port + + if self.ip_is_router_interface(dst_ip_address) and ( + frame.icmp or dst_port in self.software_manager.get_open_ports() + ): + return True + + return False + def receive_frame(self, frame: Frame, from_network_interface: RouterInterface): """ Processes an incoming frame received on one of the router's interfaces. @@ -1021,26 +1230,8 @@ class Router(NetworkNode): if self.operating_state != NodeOperatingState.ON: return - protocol = frame.ip.protocol - src_ip_address = frame.ip.src_ip_address - dst_ip_address = frame.ip.dst_ip_address - src_port = None - dst_port = None - if frame.ip.protocol == IPProtocol.TCP: - src_port = frame.tcp.src_port - dst_port = frame.tcp.dst_port - elif frame.ip.protocol == IPProtocol.UDP: - src_port = frame.udp.src_port - dst_port = frame.udp.dst_port - # Check if it's permitted - permitted, rule = self.acl.is_permitted( - protocol=protocol, - src_ip_address=src_ip_address, - src_port=src_port, - dst_ip_address=dst_ip_address, - dst_port=dst_port, - ) + permitted, rule = self.acl.is_permitted(frame) if not permitted: at_port = self._get_port_of_nic(from_network_interface) @@ -1054,13 +1245,7 @@ class Router(NetworkNode): network_interface=from_network_interface, ) - send_to_session_manager = False - if (frame.icmp and self.ip_is_router_interface(dst_ip_address)) or ( - dst_port in self.software_manager.get_open_ports() - ): - send_to_session_manager = True - - if send_to_session_manager: + if self.check_send_frame_to_session_manager(frame): # Port is open on this Router so pass Frame up to session manager first self.session_manager.receive_frame(frame, from_network_interface) else: @@ -1196,7 +1381,7 @@ class Router(NetworkNode): def show(self, markdown: bool = False): """ - Prints the state of the Ethernet interfaces on the Router. + Prints the state of the network interfaces on the Router. :param markdown: Flag to indicate if the output should be in markdown format. """ @@ -1205,7 +1390,7 @@ class Router(NetworkNode): if markdown: table.set_style(MARKDOWN) table.align = "l" - table.title = f"{self.hostname} Ethernet Interfaces" + table.title = f"{self.hostname} Network Interfaces" for port, network_interface in self.network_interface.items(): table.add_row( [ diff --git a/src/primaite/simulator/network/transmission/network_layer.py b/src/primaite/simulator/network/transmission/network_layer.py index c6328a60..bdf4babc 100644 --- a/src/primaite/simulator/network/transmission/network_layer.py +++ b/src/primaite/simulator/network/transmission/network_layer.py @@ -1,9 +1,9 @@ from enum import Enum -from ipaddress import IPv4Address from pydantic import BaseModel from primaite import getLogger +from primaite.utils.validators import IPV4Address _LOGGER = getLogger(__name__) @@ -73,9 +73,9 @@ class IPPacket(BaseModel): ... ) """ - src_ip_address: IPv4Address + src_ip_address: IPV4Address "Source IP address." - dst_ip_address: IPv4Address + dst_ip_address: IPV4Address "Destination IP address." protocol: IPProtocol = IPProtocol.TCP "IPProtocol." diff --git a/tests/integration_tests/network/test_firewall.py b/tests/integration_tests/network/test_firewall.py new file mode 100644 index 00000000..349ccd85 --- /dev/null +++ b/tests/integration_tests/network/test_firewall.py @@ -0,0 +1,280 @@ +from ipaddress import IPv4Address + +import pytest + +from primaite.simulator.network.container import Network +from primaite.simulator.network.hardware.nodes.host.computer import Computer +from primaite.simulator.network.hardware.nodes.network.firewall import Firewall +from primaite.simulator.network.hardware.nodes.network.router import ACLAction +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.ntp.ntp_client import NTPClient +from primaite.simulator.system.services.ntp.ntp_server import NTPServer + + +@pytest.fixture(scope="function") +def dmz_external_internal_network() -> Network: + """ + Fixture for setting up a simulated network with a firewall, external node, internal node, and DMZ node. This + configuration is designed to test firewall rules and their impact on traffic between these network segments. + + -------------- -------------- -------------- + | external |---------| firewall |---------| internal | + -------------- -------------- -------------- + | + | + --------- + | DMZ | + --------- + + The network is set up as follows: + - An external node simulates an entity outside the organization's network. + - An internal node represents a device within the organization's LAN. + - A DMZ (Demilitarized Zone) node acts as a server or service exposed to external traffic. + - A firewall node controls traffic between these nodes based on ACL (Access Control List) rules. + + The firewall is configured to allow ICMP and ARP traffic across all interfaces to ensure basic connectivity + for the tests. Specific tests will modify ACL rules to test various traffic filtering scenarios. + + :return: A `Network` instance with the described nodes and configurations. + """ + network = Network() + + firewall_node: Firewall = Firewall(hostname="firewall_1", start_up_duration=0) + firewall_node.power_on() + # configure firewall ports + firewall_node.configure_external_port( + ip_address=IPv4Address("192.168.10.1"), subnet_mask=IPv4Address("255.255.255.0") + ) + firewall_node.configure_dmz_port(ip_address=IPv4Address("192.168.1.1"), subnet_mask=IPv4Address("255.255.255.0")) + firewall_node.configure_internal_port( + ip_address=IPv4Address("192.168.0.1"), subnet_mask=IPv4Address("255.255.255.0") + ) + + # Allow ICMP + firewall_node.internal_inbound_acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + firewall_node.internal_outbound_acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + firewall_node.external_inbound_acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + firewall_node.external_outbound_acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + firewall_node.dmz_inbound_acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + firewall_node.dmz_outbound_acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + + # Allow ARP + firewall_node.internal_inbound_acl.add_rule( + action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22 + ) + firewall_node.internal_outbound_acl.add_rule( + action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22 + ) + firewall_node.external_inbound_acl.add_rule( + action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22 + ) + firewall_node.external_outbound_acl.add_rule( + action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22 + ) + firewall_node.dmz_inbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22) + firewall_node.dmz_outbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22) + + # external node + external_node = Computer( + hostname="external_node", + ip_address="192.168.10.2", + subnet_mask="255.255.255.0", + default_gateway="192.168.10.1", + start_up_duration=0, + ) + external_node.power_on() + external_node.software_manager.install(NTPServer) + ntp_service: NTPServer = external_node.software_manager.software["NTPServer"] + ntp_service.start() + # connect external node to firewall node + network.connect(endpoint_b=external_node.network_interface[1], endpoint_a=firewall_node.external_port) + + # internal node + internal_node = Computer( + hostname="internal_node", + ip_address="192.168.0.2", + subnet_mask="255.255.255.0", + default_gateway="192.168.0.1", + start_up_duration=0, + ) + internal_node.power_on() + internal_node.software_manager.install(NTPClient) + internal_ntp_client: NTPClient = internal_node.software_manager.software["NTPClient"] + internal_ntp_client.configure(external_node.network_interface[1].ip_address) + internal_ntp_client.start() + # connect external node to firewall node + network.connect(endpoint_b=internal_node.network_interface[1], endpoint_a=firewall_node.internal_port) + + # dmz node + dmz_node = Computer( + hostname="dmz_node", + ip_address="192.168.1.2", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0, + ) + dmz_node.power_on() + dmz_ntp_client: NTPClient = dmz_node.software_manager.software["NTPClient"] + dmz_ntp_client.configure(external_node.network_interface[1].ip_address) + dmz_ntp_client.start() + # connect external node to firewall node + network.connect(endpoint_b=dmz_node.network_interface[1], endpoint_a=firewall_node.dmz_port) + + return network + + +def test_firewall_can_ping_nodes(dmz_external_internal_network): + """ + Tests the firewall's ability to ping the external, internal, and DMZ nodes in the network. + + Verifies that the firewall has connectivity to all nodes within the network by performing a ping operation. + Successful pings indicate proper network setup and basic ICMP traffic passage through the firewall. + """ + firewall = dmz_external_internal_network.get_node_by_hostname("firewall_1") + + # ping from the firewall + assert firewall.ping("192.168.0.2") # firewall to internal + assert firewall.ping("192.168.1.2") # firewall to dmz + assert firewall.ping("192.168.10.2") # firewall to external + + +def test_nodes_can_ping_default_gateway(dmz_external_internal_network): + """ + Checks if the external, internal, and DMZ nodes can ping their respective default gateways. + + This test confirms that each node is correctly configured with a route to its default gateway and that the + firewall permits ICMP traffic for these basic connectivity checks. + """ + external_node = dmz_external_internal_network.get_node_by_hostname("external_node") + internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node") + dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node") + + assert internal_node.ping(internal_node.default_gateway) # default gateway internal + assert dmz_node.ping(dmz_node.default_gateway) # default gateway dmz + assert external_node.ping(external_node.default_gateway) # default gateway external + + +def test_nodes_can_ping_default_gateway_on_another_subnet(dmz_external_internal_network): + """ + Verifies that nodes can ping default gateways located in a different subnet, facilitated by the firewall. + + This test assesses the routing and firewall ACL configurations that allow ICMP traffic between different + network segments, ensuring that nodes can reach default gateways outside their local subnet. + """ + external_node = dmz_external_internal_network.get_node_by_hostname("external_node") + internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node") + dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node") + + assert internal_node.ping(external_node.default_gateway) # internal node to external default gateway + assert internal_node.ping(dmz_node.default_gateway) # internal node to dmz default gateway + + assert dmz_node.ping(internal_node.default_gateway) # dmz node to internal default gateway + assert dmz_node.ping(external_node.default_gateway) # dmz node to external default gateway + + assert external_node.ping(external_node.default_gateway) # external node to internal default gateway + assert external_node.ping(dmz_node.default_gateway) # external node to dmz default gateway + + +def test_nodes_can_ping_each_other(dmz_external_internal_network): + """ + Evaluates the ability of each node (external, internal, DMZ) to ping the other nodes within the network. + + This comprehensive connectivity test checks if the firewall's current ACL configuration allows for inter-node + communication via ICMP pings, highlighting the effectiveness of the firewall rules in place. + """ + external_node = dmz_external_internal_network.get_node_by_hostname("external_node") + internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node") + dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node") + + # test that nodes can ping each other + assert internal_node.ping(external_node.network_interface[1].ip_address) + assert internal_node.ping(dmz_node.network_interface[1].ip_address) + + assert external_node.ping(internal_node.network_interface[1].ip_address) + assert external_node.ping(dmz_node.network_interface[1].ip_address) + + assert dmz_node.ping(internal_node.network_interface[1].ip_address) + assert dmz_node.ping(external_node.network_interface[1].ip_address) + + +def test_service_blocked(dmz_external_internal_network): + """ + Tests the firewall's default blocking stance on NTP service requests from internal and DMZ nodes. + + Initially, without specific ACL rules to allow NTP traffic, this test confirms that NTP clients on both the + internal and DMZ nodes are unable to update their time, demonstrating the firewall's effective blocking of + unspecified services. + """ + firewall = dmz_external_internal_network.get_node_by_hostname("firewall_1") + internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node") + dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node") + internal_ntp_client: NTPClient = internal_node.software_manager.software["NTPClient"] + dmz_ntp_client: NTPClient = dmz_node.software_manager.software["NTPClient"] + + assert not internal_ntp_client.time + + internal_ntp_client.request_time() + + assert not internal_ntp_client.time + + assert not dmz_ntp_client.time + + dmz_ntp_client.request_time() + + assert not dmz_ntp_client.time + + firewall.show_rules() + + +def test_service_allowed_with_rule(dmz_external_internal_network): + """ + Tests that NTP service requests are allowed through the firewall based on ACL rules. + + This test verifies the functionality of the firewall in a network scenario where both an internal node and + a node in the DMZ attempt to access NTP services. Initially, no NTP traffic is allowed. The test then + configures ACL rules on the firewall to permit NTP traffic and checks if the NTP clients on the internal + node and DMZ node can successfully request and receive time updates. + + Procedure: + 1. Assert that the internal node's NTP client initially has no time information due to ACL restrictions. + 2. Add ACL rules to the firewall to permit outbound and inbound NTP traffic from the internal network. + 3. Trigger an NTP time request from the internal node and assert that it successfully receives time + information. + 4. Assert that the DMZ node's NTP client initially has no time information. + 5. Add ACL rules to the firewall to permit outbound and inbound NTP traffic from the DMZ. + 6. Trigger an NTP time request from the DMZ node and assert that it successfully receives time information. + + Asserts: + - The internal node's NTP client has no time information before ACL rules are applied. + - The internal node's NTP client successfully receives time information after the appropriate ACL rules + are applied. + - The DMZ node's NTP client has no time information before ACL rules are applied for the DMZ. + - The DMZ node's NTP client successfully receives time information after the appropriate ACL rules for + the DMZ are applied. + """ + firewall = dmz_external_internal_network.get_node_by_hostname("firewall_1") + internal_node = dmz_external_internal_network.get_node_by_hostname("internal_node") + dmz_node = dmz_external_internal_network.get_node_by_hostname("dmz_node") + internal_ntp_client: NTPClient = internal_node.software_manager.software["NTPClient"] + dmz_ntp_client: NTPClient = dmz_node.software_manager.software["NTPClient"] + + assert not internal_ntp_client.time + + firewall.internal_outbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port.NTP, dst_port=Port.NTP, position=1) + firewall.internal_inbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port.NTP, dst_port=Port.NTP, position=1) + + internal_ntp_client.request_time() + + assert internal_ntp_client.time + + assert not dmz_ntp_client.time + + firewall.dmz_outbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port.NTP, dst_port=Port.NTP, position=1) + firewall.dmz_inbound_acl.add_rule(action=ACLAction.PERMIT, src_port=Port.NTP, dst_port=Port.NTP, position=1) + + dmz_ntp_client.request_time() + + assert dmz_ntp_client.time + + firewall.show_rules() diff --git a/tests/unit_tests/_primaite/_simulator/_network/_hardware/nodes/test_acl.py b/tests/unit_tests/_primaite/_simulator/_network/_hardware/nodes/test_acl.py index 428f370c..8b1aa9be 100644 --- a/tests/unit_tests/_primaite/_simulator/_network/_hardware/nodes/test_acl.py +++ b/tests/unit_tests/_primaite/_simulator/_network/_hardware/nodes/test_acl.py @@ -1,111 +1,293 @@ from ipaddress import IPv4Address +import pytest + +from primaite.simulator.network.hardware.base import generate_mac_address from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router -from primaite.simulator.network.transmission.network_layer import IPProtocol -from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.network.protocols.icmp import ICMPPacket +from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame +from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader -def test_add_rule(): +@pytest.fixture(scope="function") +def router_with_acl_rules(): + """ + Provides a router instance with predefined ACL rules for testing. + + :Setup: + 1. Creates a Router object named "Router". + 2. Adds a PERMIT rule for TCP traffic from 192.168.1.1:HTTPS to 192.168.1.2:HTTP. + 3. Adds a DENY rule for TCP traffic from 192.168.1.3:8080 to 192.168.1.4:80. + + :return: A configured Router object with ACL rules. + """ router = Router("Router") acl = router.acl + # Add rules here as needed acl.add_rule( action=ACLAction.PERMIT, protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.1"), + src_ip_address="192.168.1.1", + src_port=Port.HTTPS, + dst_ip_address="192.168.1.2", + dst_port=Port.HTTP, + position=1, + ) + acl.add_rule( + action=ACLAction.DENY, + protocol=IPProtocol.TCP, + src_ip_address="192.168.1.3", src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.2"), + dst_ip_address="192.168.1.4", + dst_port=Port(80), + position=2, + ) + return router + + +@pytest.fixture(scope="function") +def router_with_wildcard_acl(): + """ + Provides a router instance with ACL rules that include wildcard masking for testing. + + :Setup: + 1. Creates a Router object named "Router". + 2. Adds a PERMIT rule for TCP traffic from 192.168.1.1:8080 to 10.1.1.2:80. + 3. Adds a DENY rule with a wildcard mask for TCP traffic from the 192.168.1.0/24 network to 10.1.1.3:443. + 4. Adds a PERMIT rule for any traffic to the 10.2.0.0/16 network. + + :return: A Router object with configured ACL rules, including rules with wildcard masking. + """ + router = Router("Router") + acl = router.acl + # Rule to permit traffic from a specific source IP and port to a specific destination IP and port + acl.add_rule( + action=ACLAction.PERMIT, + protocol=IPProtocol.TCP, + src_ip_address="192.168.1.1", + src_port=Port(8080), + dst_ip_address="10.1.1.2", dst_port=Port(80), position=1, ) + # Rule to deny traffic from an IP range to a specific destination IP and port + acl.add_rule( + action=ACLAction.DENY, + protocol=IPProtocol.TCP, + src_ip_address="192.168.1.0", + src_wildcard_mask="0.0.0.255", + dst_ip_address="10.1.1.3", + dst_port=Port(443), + position=2, + ) + # Rule to permit any traffic to a range of destination IPs + acl.add_rule( + action=ACLAction.PERMIT, + protocol=None, + src_ip_address=None, + dst_ip_address="10.2.0.0", + dst_wildcard_mask="0.0.255.255", + position=3, + ) + return router + + +def test_add_rule(router_with_acl_rules): + """ + Tests that an ACL rule is added correctly to the router's ACL. + + Asserts: + - The action of the added rule is PERMIT. + - The protocol of the added rule is TCP. + - The source IP address matches "192.168.1.1". + - The source port is HTTPS. + - The destination IP address matches "192.168.1.2". + - The destination port is HTTP. + """ + acl = router_with_acl_rules.acl + assert acl.acl[1].action == ACLAction.PERMIT assert acl.acl[1].protocol == IPProtocol.TCP assert acl.acl[1].src_ip_address == IPv4Address("192.168.1.1") - assert acl.acl[1].src_port == Port(8080) + assert acl.acl[1].src_port == Port.HTTPS assert acl.acl[1].dst_ip_address == IPv4Address("192.168.1.2") - assert acl.acl[1].dst_port == Port(80) + assert acl.acl[1].dst_port == Port.HTTP -def test_remove_rule(): - router = Router("Router") - acl = router.acl - acl.add_rule( - action=ACLAction.PERMIT, - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.1"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.2"), - dst_port=Port(80), - position=1, - ) +def test_remove_rule(router_with_acl_rules): + """ + Tests the removal of an ACL rule from the router's ACL. + + Asserts that accessing the removed rule index in the ACL returns None. + """ + acl = router_with_acl_rules.acl acl.remove_rule(1) - assert not acl.acl[1] + assert acl.acl[1] is None -def test_rules(): - router = Router("Router") - acl = router.acl - acl.add_rule( - action=ACLAction.PERMIT, - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.1"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.2"), - dst_port=Port(80), - position=1, - ) - acl.add_rule( - action=ACLAction.DENY, - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.3"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.4"), - dst_port=Port(80), - position=2, - ) - is_permitted, rule = acl.is_permitted( - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.1"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.2"), - dst_port=Port(80), +def test_traffic_permitted_by_specific_rule(router_with_acl_rules): + """ + Verifies that traffic matching a specific ACL rule is correctly permitted. + + Asserts traffic that matches a permit rule is allowed through the ACL. + """ + acl = router_with_acl_rules.acl + permitted_frame = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.1", dst_ip_address="192.168.1.2", protocol=IPProtocol.TCP), + tcp=TCPHeader(src_port=Port.HTTPS, dst_port=Port.HTTP), ) + is_permitted, _ = acl.is_permitted(permitted_frame) assert is_permitted - is_permitted, rule = acl.is_permitted( - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.3"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.4"), - dst_port=Port(80), + + +def test_traffic_denied_by_specific_rule(router_with_acl_rules): + """ + Verifies that traffic matching a specific ACL rule is correctly denied. + + Asserts traffic that matches a deny rule is blocked by the ACL. + """ + + acl = router_with_acl_rules.acl + not_permitted_frame = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.3", dst_ip_address="192.168.1.4", protocol=IPProtocol.TCP), + tcp=TCPHeader(src_port=Port(8080), dst_port=Port(80)), ) + is_permitted, _ = acl.is_permitted(not_permitted_frame) assert not is_permitted -def test_default_rule(): +def test_default_rule(router_with_acl_rules): + """ + Tests the default deny rule of the ACL. + + This test verifies that traffic which does not match any explicit permit rule in the ACL + is correctly denied, as per the common "default deny" security stance that ACLs implement. + + Asserts the frame does not match any of the predefined ACL rules and is therefore not permitted by the ACL, + illustrating the default deny behavior when no explicit permit rule is matched. + """ + acl = router_with_acl_rules.acl + not_permitted_frame = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.5", dst_ip_address="192.168.1.12", protocol=IPProtocol.UDP), + udp=UDPHeader(src_port=Port.HTTPS, dst_port=Port.HTTP), + ) + is_permitted, rule = acl.is_permitted(not_permitted_frame) + assert not is_permitted + + +def test_direct_ip_match_with_acl(router_with_wildcard_acl): + """ + Tests ACL functionality for a direct IP address match. + + Asserts direct IP address match traffic is permitted by the ACL rule. + """ + acl = router_with_wildcard_acl.acl + frame = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.1", dst_ip_address="10.1.1.2", protocol=IPProtocol.TCP), + tcp=TCPHeader(src_port=Port(8080), dst_port=Port(80)), + ) + assert acl.is_permitted(frame)[0], "Direct IP match should be permitted." + + +def test_ip_range_match_denied_with_acl(router_with_wildcard_acl): + """ + Tests ACL functionality for denying traffic from an IP range using wildcard masking. + + Asserts traffic from the specified IP range is correctly denied by the ACL rule. + """ + acl = router_with_wildcard_acl.acl + frame = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.100", dst_ip_address="10.1.1.3", protocol=IPProtocol.TCP), + tcp=TCPHeader(src_port=Port(8080), dst_port=Port(443)), + ) + assert not acl.is_permitted(frame)[0], "IP range match with wildcard mask should be denied." + + +def test_traffic_permitted_to_destination_range_with_acl(router_with_wildcard_acl): + """ + Tests ACL functionality for permitting traffic to a destination IP range using wildcard masking. + + Asserts traffic to the specified destination IP range is correctly permitted by the ACL rule. + """ + acl = router_with_wildcard_acl.acl + frame = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.50", dst_ip_address="10.2.200.200", protocol=IPProtocol.UDP), + udp=UDPHeader(src_port=Port(1433), dst_port=Port(1433)), + ) + assert acl.is_permitted(frame)[0], "Traffic to destination IP range should be permitted." + + +def test_ip_traffic_from_specific_subnet(): + """ + Tests that the ACL permits or denies IP traffic from specific subnets, mimicking a Cisco ACL rule for IP traffic. + + This test verifies the ACL's ability to permit all IP traffic from a specific subnet (192.168.1.0/24) while denying + traffic from other subnets. The test mimics a Cisco ACL rule that allows IP traffic from a specified range using + wildcard masking. + + The test frames are constructed with varying protocols (TCP, UDP, ICMP) and source IP addresses, to demonstrate the + rule's general applicability to all IP protocols and its enforcement based on source IP address range. + + Asserts + - Traffic from within the 192.168.1.0/24 subnet is permitted. + - Traffic from outside the 192.168.1.0/24 subnet is denied. + """ + router = Router("Router") acl = router.acl + # Add rules here as needed acl.add_rule( action=ACLAction.PERMIT, - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.1"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.2"), - dst_port=Port(80), + src_ip_address="192.168.1.0", + src_wildcard_mask="0.0.0.255", position=1, ) - acl.add_rule( - action=ACLAction.DENY, - protocol=IPProtocol.TCP, - src_ip_address=IPv4Address("192.168.1.3"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.4"), - dst_port=Port(80), - position=2, + + permitted_frame_1 = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.50", dst_ip_address="10.2.200.200", protocol=IPProtocol.TCP), + tcp=TCPHeader(src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER), ) - is_permitted, rule = acl.is_permitted( - protocol=IPProtocol.UDP, - src_ip_address=IPv4Address("192.168.1.5"), - src_port=Port(8080), - dst_ip_address=IPv4Address("192.168.1.12"), - dst_port=Port(80), + + assert acl.is_permitted(permitted_frame_1)[0] + + permitted_frame_2 = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.10", dst_ip_address="85.199.214.101", protocol=IPProtocol.UDP), + udp=UDPHeader(src_port=Port.NTP, dst_port=Port.NTP), ) - assert not is_permitted + + assert acl.is_permitted(permitted_frame_2)[0] + + permitted_frame_3 = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.1.200", dst_ip_address="192.168.1.1", protocol=IPProtocol.ICMP), + icmp=ICMPPacket(identifier=1), + ) + + assert acl.is_permitted(permitted_frame_3)[0] + + not_permitted_frame_1 = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.0.50", dst_ip_address="10.2.200.200", protocol=IPProtocol.TCP), + tcp=TCPHeader(src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER), + ) + + assert not acl.is_permitted(not_permitted_frame_1)[0] + + not_permitted_frame_2 = Frame( + ethernet=EthernetHeader(src_mac_addr=generate_mac_address(), dst_mac_addr=generate_mac_address()), + ip=IPPacket(src_ip_address="192.168.2.10", dst_ip_address="85.199.214.101", protocol=IPProtocol.UDP), + udp=UDPHeader(src_port=Port.NTP, dst_port=Port.NTP), + ) + + assert not acl.is_permitted(not_permitted_frame_2)[0] + + acl.show()