Merge remote-tracking branch 'origin/dev' into feature/1812-traverse-actions

This commit is contained in:
Marek Wolan
2023-09-05 13:20:20 +01:00
43 changed files with 2978 additions and 396 deletions

60
.github/workflows/build-sphinx.yml vendored Normal file
View File

@@ -0,0 +1,60 @@
name: build-sphinx-to-github-pages
env:
GITHUB_ACTOR: Autonomous-Resilient-Cyber-Defence
GITHUB_REPOSITORY: Autonomous-Resilient-Cyber-Defence/PrimAITE
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN}}
on:
push:
branches: [main]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install python dev
run: |
set -x
sudo apt-get update
sudo add-apt-repository ppa:deadsnakes/ppa -y
sudo apt install python${{ matrix.python-version}}-dev -y
- name: Install Git
run: |
set -x
sudo apt-get install -y git
shell: bash
- name: Set pip, wheel, setuptools versions
run: |
python -m pip install --upgrade pip==23.0.1
pip install wheel==0.38.4 --upgrade
pip install setuptools==66 --upgrade
pip install build
- name: Install PrimAITE for docs autosummary
run: |
set -x
python -m pip install -e .[dev]
- name: Run build script for Sphinx pages
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
set -x
bash $PWD/docs/build-sphinx-docs-to-github-pages.sh

View File

@@ -7,12 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Network Hardware - Added base hardware module with NIC, SwitchPort, Node, Switch, and Link. Nodes and Switches have
- Network Hardware - Added base hardware module with NIC, SwitchPort, Node, and Link. Nodes have
fundamental services like ARP, ICMP, and PCAP running them by default.
- Network Transmission - Modelled OSI Model layers 1 through to 5 with various classes for creating network frames and
transmitting them from a Service/Application, down through the layers, over the wire, and back up through the layers to
a Service/Application another machine.
- Introduced `Router` and `Switch` classes to manage networking routes more effectively.
- Added `ACLRule` and `RouteTableEntry` classes as part of the `Router`.
- New `.show()` methods in all network component classes to inspect the state in either plain text or markdown formats.
- Added `Computer` and `Server` class to better differentiate types of network nodes.
- Integrated a new Use Case 2 network into the system.
- New unit tests to verify routing between different subnets using `.ping()`.
- system - Added the core structure of Application, Services, and Components. Also added a SoftwareManager and
SessionManager.
- Permission System - each action can define criteria that will be used to permit or deny agent actions.

15
LICENSE
View File

@@ -1,28 +1,21 @@
MIT License License
MIT License
MIT License Conditions
Copyright (c) 2023 - 2025 Defence Science and Technology Laboratory UK (https://dstl.gov.uk)
These MIT License conditions confirm the provision of the following artefacts as MIT License by Defence Science and Technology
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,2 +1,3 @@
include src/primaite/setup/_package_data/primaite_config.yaml
include src/primaite/config/_package_data/*.yaml
include src/primaite/simulator/_package_data/*.ipynb

Binary file not shown.

After

Width:  |  Height:  |  Size: 274 KiB

View File

@@ -1,12 +1,12 @@
# PrimAITE
![image](https://github.com/Autonomous-Resilient-Cyber-Defence/PrimAITE/assets/107395948/fdefa884-1105-44da-88fe-e3a1c98ee361)
![image](./PrimAITE_logo_transparent.png)
The ARCD Primary-level AI Training Environment (**PrimAITE**) provides an effective simulation capability for the purposes of training and evaluating AI in a cyber-defensive role. It incorporates the functionality required of a primary-level ARCD environment, which includes:
- The ability to model a relevant platform / system context;
- The ability to model key characteristics of a platform / system by representing connections, IP addresses, ports, traffic loading, operating systems, services and processes;
- The ability to model key characteristics of a platform / system by representing connections, IP addresses, ports, traffic loading, operating systems and services;
- Operates at machine-speed to enable fast training cycles.
@@ -24,7 +24,7 @@ PrimAITE presents the following features:
- Application of IERs to the platform / system laydown adheres to the ACL ruleset;
- Presents an OpenAI gym or RLLib interface to the environment, allowing integration with any OpenAI gym compliant defensive agents;
- Presents an OpenAI gym or RLLib interface to the environment, allowing integration with any compliant defensive agents;
- Full capture of discrete logs relating to agent training (full system state, agent actions taken, instantaneous and average reward for every step of every episode);

View File

@@ -0,0 +1,67 @@
#!/bin/bash
set -x
apt-get update
apt-get -y install git rsync python3-sphinx
pwd ls -lah
export SOURCE_DATE_EPOCH=$(git log -1 --pretty=%ct)
##############
# BUILD DOCS #
##############
cd docs
# Python Sphinx, configured with source/conf.py
# See https://www.sphinx-doc.org/
make clean
make html
cd ..
#######################
# Update GitHub Pages #
#######################
git config --global user.name "${GITHUB_ACTOR}"
git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com"
docroot=`mktemp -d`
rsync -av $PWD/docs/_build/html/ "${docroot}/"
pushd "${docroot}"
git init
git remote add deploy "https://token:${GITHUB_TOKEN}@github.com/${GITHUB_REPOSITORY}.git"
git checkout -b sphinx-docs-github-pages
# Adds .nojekyll file to the root to signal to GitHub that
# directories that start with an underscore (_) can remain
touch .nojekyll
# Add README
cat > README.md <<EOF
# README for the Sphinx Docs GitHub Pages Branch
This branch is simply a cache for the website served from https://Autonomous-Resilient-Cyber-Defence.github.io/PrimAITE/,
and is not intended to be viewed on github.com.
For more information on how this site is built using Sphinx, Read the Docs, GitHub Actions/Pages, and demo
implementation from https://github.com/annegentle, see:
* https://www.docslikecode.com/articles/github-pages-python-sphinx/
* https://tech.michaelaltfield.net/2020/07/18/sphinx-rtd-github-pages-1
* https://github.com/annegentle/create-demo
EOF
# Copy the resulting html pages built from Sphinx to the sphinx-docs-github-pages branch
git add .
# Make a commit with changes and any new files
msg="Updating Docs for commit ${GITHUB_SHA} made on `date -d"@${SOURCE_DATE_EPOCH}" --iso-8601=seconds` from ${GITHUB_REF} by ${GITHUB_ACTOR}"
git commit -am "${msg}"
# overwrite the contents of the sphinx-docs-github-pages branch on our github.com repo
git push deploy sphinx-docs-github-pages --force
popd # return to main repo sandbox root
# exit cleanly
exit 0

View File

@@ -43,7 +43,6 @@ extensions = [
"sphinx.ext.viewcode", # Add a link to the Python source code for classes, functions etc.
"sphinx.ext.todo",
"sphinx_copybutton", # Adds a copy button to code blocks
"sphinx_code_tabs", # Enables tabbed code blocks
]

View File

@@ -8,11 +8,68 @@ Welcome to PrimAITE's documentation
What is PrimAITE?
-----------------
PrimAITE (Primary-level AI Training Environment) is a simulation environment for training AI under the ARCD programme. It incorporates the functionality required of a Primary-level environment, as specified in the Dstl ARCD Training Environment Matrix document:
Overview
^^^^^^^^
The ARCD Primary-level AI Training Environment (**PrimAITE**) provides an effective simulation capability for the purposes of training and evaluating AI in a cyber-defensive role. It incorporates the functionality required of a primary-level ARCD environment, which includes:
- The ability to model a relevant platform / system context;
- Modelling an adversarial agent that the defensive agent can be trained and evaluated against;
- The ability to model key characteristics of a platform / system by representing connections, IP addresses, ports, operating systems, services and traffic loading on links;
- Modelling background pattern-of-life;
- Operates at machine-speed to enable fast training cycles.
Features
^^^^^^^^
PrimAITE incorporates the following features:
- Highly configurable (via YAML files) to provide the means to model a variety of platform / system laydowns and adversarial attack scenarios;
- A Reinforcement Learning (RL) reward function based on (a) the ability to counter the modelled adversarial cyber-attack, and (b) the ability to ensure success;
- Provision of logging to support AI performance / effectiveness assessment;
- Uses the concept of Information Exchange Requirements (IERs) to model background pattern of life and adversarial behaviour;
- An Access Control List (ACL) function, mimicking the behaviour of a network firewall, is applied across the model, following standard ACL rule format (e.g. DENY/ALLOW, source IP address, destination IP address, protocol and port);
- Application of traffic to the links of the platform / system laydown adheres to the ACL ruleset;
- Presents both an OpenAI gym and Ray RLLib interface to the environment, allowing integration with any compliant defensive agents;
- Allows for the saving and loading of trained defensive agents;
- Stochastic adversarial agent behaviour;
- Full capture of discrete logs relating to agent training or evaluation (system state, agent actions taken, instantaneous and average reward for every step of every episode);
- Distinct control over running a training and / or evaluation session;
- NetworkX provides laydown visualisation capability.
Architecture
^^^^^^^^^^^^
PrimAITE is a Python application and is therefore Operating System agnostic. The OpenAI gym and Ray RLLib frameworks are employed to provide an interface and source for AI agents. Configuration of PrimAITE is achieved via included YAML files which support full control over the platform / system laydown being modelled, background pattern of life, adversarial (red agent) behaviour, and step and episode count. NetworkX based nodes and links host Python classes to present attributes and methods, and hence a more representative platform / system can be modelled within the simulation.
Training & Evaluation Capability
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
PrimAITE provides a training and evaluation capability to AI agents in the context of cyber-attack, via its OpenAI Gym and RLLib compliant interface. Scenarios can be constructed to reflect platform / system laydowns consisting of any configuration of nodes (e.g. PCs, servers, switches etc.) and network links between them. All nodes can be configured to model services (and their status) and the traffic loading between them over the network links. Traffic loading is broken down into a per service granularity, relating directly to a protocol (e.g. Service A would be configured as a TCP service, and TCP traffic then flows between instances of Service A under the direction of a tailored IER). Highlights of PrimAITEs training and evaluation capability are:
- The scenario is not bound to a representation of any platform, system, or technology;
- Fully configurable (network / system laydown, IERs, node pattern-of-life, ACL, number of episodes, steps per episode) and repeatable to suit the requirements of AI agents;
- Can integrate with any OpenAI Gym or RLLib compliant AI agent.
Use of PrimAITE default scenarios within ARCD is supported by a “Use Case Profile” tailored to the scenario.
AI Assessment Capability
^^^^^^^^^^^^^^^^^^^^^^^^
PrimAITE includes the capability to support in-depth assessment of cyber defence AI by outputting logs of the environment state and AI behaviour throughout both training and evaluation sessions. These logs include the following data:
- Timestamp;
- Episode and step number;
- Agent identifier;
- Observation space;
- Action taken (by defensive AI);
- Reward value.
Logs are available in CSV format and provide coverage of the above data for every step of every episode.
* The ability to model a relevant platform / system context;
* The ability to model key characteristics of a platform / system by representing connections, IP addresses, ports, traffic loading, operating systems, file system, services and processes;
* Operates at machine-speed to enable fast training cycles.
What is PrimAITE built with
@@ -36,13 +93,13 @@ Head over to the :ref:`getting-started` page to install and setup PrimAITE!
.. toctree::
:maxdepth: 8
:caption: Contents:
:hidden:
source/getting_started
source/about
source/config
source/primaite_session
source/custom_agent
source/simulation
PrimAITE API <source/_autosummary/primaite>
PrimAITE Tests <source/_autosummary/tests>
source/dependencies

View File

@@ -14,20 +14,19 @@ Pre-Requisites
In order to get **PrimAITE** installed, you will need to have a python version between 3.8 and 3.10 installed. If you don't already have it, this is how to install it:
.. tabs:: lang
.. code-block:: bash
:caption: Unix
.. code-tab:: bash
:caption: Unix
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt install python3.10
sudo apt-get install python3-pip
sudo apt-get install python3-venv
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt install python3.10
sudo apt-get install python3-pip
sudo apt-get install python3-venv
.. code-tab:: text
:caption: Windows (Powershell)
.. code-block:: text
:caption: Windows (Powershell)
- Manual install from: https://www.python.org/downloads/release/python-31011/
- Manual install from: https://www.python.org/downloads/release/python-31011/
**PrimAITE** is designed to be OS-agnostic, and thus should work on most variations/distros of Linux, Windows, and MacOS.
@@ -36,30 +35,30 @@ Install PrimAITE
1. Create a primaite directory in your home directory:
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
mkdir ~/primaite/2.0.0
.. code-block:: bash
:caption: Unix
.. code-tab:: powershell
:caption: Windows (Powershell)
mkdir ~/primaite/2.0.0
mkdir ~\primaite\2.0.0
.. code-block:: powershell
:caption: Windows (Powershell)
mkdir ~\primaite\2.0.0
2. Navigate to the primaite directory and create a new python virtual environment (venv)
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
cd ~/primaite/2.0.0
python3 -m venv .venv
.. code-block:: bash
:caption: Unix
.. code-tab:: powershell
:caption: Windows (Powershell)
cd ~/primaite/2.0.0
python3 -m venv .venv
.. code-block:: powershell
:caption: Windows (Powershell)
cd ~\primaite\2.0.0
python3 -m venv .venv
@@ -67,44 +66,41 @@ Install PrimAITE
3. Activate the venv
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
.. code-block:: bash
:caption: Unix
source .venv/bin/activate
source .venv/bin/activate
.. code-tab:: powershell
:caption: Windows (Powershell)
.. code-block:: powershell
:caption: Windows (Powershell)
.\.venv\Scripts\activate
.\.venv\Scripts\activate
4. Install PrimAITE using pip from PyPi
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
.. code-block:: bash
:caption: Unix
pip install primaite
pip install primaite
.. code-tab:: powershell
:caption: Windows (Powershell)
.. code-block:: powershell
:caption: Windows (Powershell)
pip install primaite
pip install primaite
5. Perform the PrimAITE setup
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
.. code-block:: bash
:caption: Unix
primaite setup
primaite setup
.. code-tab:: powershell
:caption: Windows (Powershell)
.. code-block:: powershell
:caption: Windows (Powershell)
primaite setup
@@ -123,33 +119,31 @@ of your choice:
Create and activate your Python virtual environment (venv)
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
.. code-block:: bash
:caption: Unix
python3 -m venv venv
source venv/bin/activate
python3 -m venv venv
source venv/bin/activate
.. code-tab:: powershell
:caption: Windows (Powershell)
.. code-block:: powershell
:caption: Windows (Powershell)
python3 -m venv venv
.\venv\Scripts\activate
python3 -m venv venv
.\venv\Scripts\activate
Install PrimAITE with the dev extra
.. tabs:: lang
.. code-tab:: bash
:caption: Unix
.. code-block:: bash
:caption: Unix
pip install -e .[dev]
pip install -e .[dev]
.. code-tab:: powershell
:caption: Windows (Powershell)
.. code-block:: powershell
:caption: Windows (Powershell)
pip install -e .[dev]
pip install -e .[dev]
To view the complete list of packages installed during PrimAITE installation, go to the dependencies page (:ref:`Dependencies`).

View File

@@ -15,31 +15,31 @@ A PrimAITE session can be ran either with the ``primaite session`` command from
Both the ``primaite session`` and :func:`primaite.main.run` take a training config and a lay down config as parameters.
.. tabs::
.. code-tab:: bash
:caption: Unix CLI
cd ~/primaite/2.0.0
source ./.venv/bin/activate
primaite session --tc ./config/my_training_config.yaml --ldc ./config/my_lay_down_config.yaml
.. code-tab:: powershell
:caption: Powershell CLI
cd ~\primaite\2.0.0
.\.venv\Scripts\activate
primaite session --tc .\config\my_training_config.yaml --ldc .\config\my_lay_down_config.yaml
.. code-tab:: python
:caption: Python
.. code-block:: bash
:caption: Unix CLI
from primaite.main import run
cd ~/primaite/2.0.0
source ./.venv/bin/activate
primaite session --tc ./config/my_training_config.yaml --ldc ./config/my_lay_down_config.yaml
training_config = <path to training config yaml file>
lay_down_config = <path to lay down config yaml file>
run(training_config, lay_down_config)
.. code-block:: powershell
:caption: Powershell CLI
cd ~\primaite\2.0.0
.\.venv\Scripts\activate
primaite session --tc .\config\my_training_config.yaml --ldc .\config\my_lay_down_config.yaml
.. code-block:: python
:caption: Python
from primaite.main import run
training_config = <path to training config yaml file>
lay_down_config = <path to lay down config yaml file>
run(training_config, lay_down_config)
When a session is ran, a session output sub-directory is created in the users app sessions directory (``~/primaite/2.0.0/sessions``).
The sub-directory is formatted as such: ``~/primaite/2.0.0/sessions/<yyyy-mm-dd>/<yyyy-mm-dd>_<hh-mm-dd>/``
@@ -51,31 +51,33 @@ For example, when running a session at 17:30:00 on 31st January 2023, the sessio
To run a PrimAITE session using legacy training or laydown config files, add the ``--legacy-tc`` and/or ``legacy-ldc`` options.
.. tabs::
.. code-tab:: bash
:caption: Unix CLI
cd ~/primaite/2.0.0
source ./.venv/bin/activate
primaite session --tc ./config/my_legacy_training_config.yaml --legacy-tc --ldc ./config/my_legacy_lay_down_config.yaml --legacy-ldc
.. code-tab:: powershell
:caption: Powershell CLI
cd ~\primaite\2.0.0
.\.venv\Scripts\activate
primaite session --tc .\config\my_legacy_training_config.yaml --legacy-tc --ldc .\config\my_legacy_lay_down_config.yaml --legacy-ldc
.. code-tab:: python
:caption: Python
.. code-block:: bash
:caption: Unix CLI
cd ~/primaite/2.0.0
source ./.venv/bin/activate
primaite session --tc ./config/my_legacy_training_config.yaml --legacy-tc --ldc ./config/my_legacy_lay_down_config.yaml --legacy-ldc
.. code-block:: powershell
:caption: Powershell CLI
cd ~\primaite\2.0.0
.\.venv\Scripts\activate
primaite session --tc .\config\my_legacy_training_config.yaml --legacy-tc --ldc .\config\my_legacy_lay_down_config.yaml --legacy-ldc
.. code-block:: python
:caption: Python
from primaite.main import run
training_config = <path to legacy training config yaml file>
lay_down_config = <path to legacy lay down config yaml file>
run(training_config, lay_down_config, legacy_training_config=True, legacy_lay_down_config=True)
from primaite.main import run
training_config = <path to legacy training config yaml file>
lay_down_config = <path to legacy lay down config yaml file>
run(training_config, lay_down_config, legacy_training_config=True, legacy_lay_down_config=True)
Outputs

View File

@@ -18,3 +18,6 @@ Contents
simulation_structure
simulation_components/network/base_hardware
simulation_components/network/transport_to_data_link_layer
simulation_components/network/router
simulation_components/network/switch
simulation_components/network/network

View File

@@ -0,0 +1,115 @@
.. only:: comment
© Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
.. _about:
Network
=======
The ``Network`` class serves as the backbone of the simulation. It offers a framework to manage various network
components such as routers, switches, servers, and clients. This document provides a detailed explanation of how to
effectively use the ``Network`` class.
Example Usage
-------------
Below demonstrates how to use the Router node to connect Nodes, and block traffic using ACLs. For this demonstration,
we'll use the following Network that has a client, server, two switches, and a router.
.. code-block:: text
+------------+ +------------+ +------------+ +------------+ +------------+
| | | | | | | | | |
| client_1 +------+ switch_2 +------+ router_1 +------+ switch_1 +------+ server_1 |
| | | | | | | | | |
+------------+ +------------+ +------------+ +------------+ +------------+
1. Relevant imports
.. code-block:: python
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import NIC
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router, ACLAction
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
2. Create the Network
.. code-block:: python
network = Network()
3. Create and configure the Router
.. code-block:: python
router_1 = Router(hostname="router_1", num_ports=3)
router_1.power_on()
router_1.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
router_1.configure_port(port=2, ip_address="192.168.2.1", subnet_mask="255.255.255.0")
4. Create and configure the two Switches
.. code-block:: python
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2.power_on()
5. Connect the Switches to the Router
.. code-block:: python
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6])
router_1.enable_port(1)
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6])
router_1.enable_port(2)
6. Create the Client and Server nodes.
.. code-block:: python
client_1 = Computer(
hostname="client_1",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1"
)
client_1.power_on()
server_1 = Server(
hostname="server_1",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
)
server_1.power_on()
7. Connect the Client and Server to the relevant Switch
.. code-block:: python
network.connect(endpoint_a=switch_2.switch_ports[1], endpoint_b=client_1.ethernet_port[1])
network.connect(endpoint_a=switch_1.switch_ports[1], endpoint_b=server_1.ethernet_port[1])
8. Add ACL rules on the Router to allow ARP and ICMP traffic.
.. code-block:: python
router_1.acl.add_rule(
action=ACLAction.PERMIT,
src_port=Port.ARP,
dst_port=Port.ARP,
position=22
)
router_1.acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.ICMP,
position=23
)

View File

@@ -0,0 +1,73 @@
.. only:: comment
© Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
.. _about:
Router Module
=============
The router module contains classes for simulating the functions of a network router.
Router
------
The Router class represents a multi-port network router that can receive, process, and route network packets between its ports and other Nodes
The router maintains internal state including:
- RouteTable - Routing table to lookup where to forward packets.
- AccessControlList - Access control rules to block or allow packets.
- ARP cache - MAC address lookups for connected devices.
- ICMP handler - Handles ICMP requests to router interfaces.
The router receives incoming frames on enabled ports. It processes the frame headers and applies the following logic:
1. Checks the AccessControlList if the packet is permitted. If blocked, it is dropped.
2. For permitted packets, routes the frame based on:
- ARP cache lookups for destination MAC address.
- ICMP echo requests handled directly.
- RouteTable lookup to forward packet out other ports.
3. Updates ARP cache as it learns new information about the Network.
RouteTable
----------
The RouteTable holds RouteEntry objects representing routes. It finds the best route for a destination IP using a metric and the longest prefix match algorithm.
Routes can be added and looked up based on destination IP address. The RouteTable is used by the Router when forwarding packets between other Nodes.
AccessControlList
-----------------
The AccessControlList defines Access Control Rules to block or allow packets. Packets are checked against the rules to determine if they are permitted to be forwarded.
Rules can be added to deny or permit traffic based on IP, port, and protocol. The ACL is checked by the Router when packets are received.
Packet Processing
-----------------
-The Router supports the following protocols and packet types:
ARP
^^^
- Handles both ARP requests and responses.
- Updates ARP cache.
- Proxies ARP replies for connected networks.
- Routes ARP requests.
ICMP
^^^^
- Responds to ICMP echo requests to Router interfaces.
- Routes other ICMP messages based on routes.
TCP/UDP
^^^^^^^
- Forwards packets based on routes like IP.
- Applies ACL rules based on protocol, source/destination IP address, and source/destination port numbers.
- Decrements TTL and drops expired TTL packets.

View File

@@ -64,9 +64,9 @@ Data Link Layer (Layer 2)
- **request:** ARP operation. Set to True for a request and False for a reply.
- **sender_mac_addr:** Sender's MAC address.
- **sender_ip:** Sender's IP address (IPv4 format).
- **sender_ip_address:** Sender's IP address (IPv4 format).
- **target_mac_addr:** Target's MAC address.
- **target_ip:** Target's IP address (IPv4 format).
- **target_ip_address:** Target's IP address (IPv4 format).
**EthernetHeader:** Represents the Ethernet layer of a network frame. It includes source and destination MAC addresses.
This header is used to identify the physical hardware addresses of devices on a local network.
@@ -102,8 +102,8 @@ address of 'aa:bb:cc:dd:ee:ff' to port 8080 on the host 10.0.0.10 which has a NI
# Network Layer
ip_packet = IPPacket(
src_ip="192.168.0.100",
dst_ip="10.0.0.10",
src_ip_address="192.168.0.100",
dst_ip_address="10.0.0.10",
protocol=IPProtocol.TCP
)
# Data Link Layer
@@ -128,8 +128,8 @@ This produces the following ``Frame`` (displayed in json format)
"dst_mac_addr": "11:22:33:44:55:66"
},
"ip": {
"src_ip": "192.168.0.100",
"dst_ip": "10.0.0.10",
"src_ip_address": "192.168.0.100",
"dst_ip_address": "10.0.0.10",
"protocol": "tcp",
"ttl": 64,
"precedence": 0

View File

@@ -10,7 +10,7 @@ license = {file = "LICENSE"}
requires-python = ">=3.8, <3.11"
dynamic = ["version", "readme"]
classifiers = [
"License :: MIT License",
"License :: OSI Approved :: MIT License",
"Development Status :: 5 - Production/Stable",
"Operating System :: Microsoft :: Windows",
"Operating System :: MacOS",
@@ -67,7 +67,6 @@ dev = [
"pytest-flake8==1.1.1",
"setuptools==66",
"Sphinx==6.1.3",
"sphinx-code-tabs==0.5.3",
"sphinx-copybutton==0.5.2",
"wheel==0.38.4"
]

View File

@@ -1,34 +0,0 @@
# © Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
"""Contains default jupyter notebooks which demonstrate PrimAITE functionality."""
import importlib.util
import os
import subprocess
import sys
from logging import Logger
from primaite import getLogger, PRIMAITE_PATHS
_LOGGER: Logger = getLogger(__name__)
def start_jupyter_session() -> None:
"""
Starts a new Jupyter notebook session in the app notebooks directory.
Currently only works on Windows OS.
.. todo:: Figure out how to get this working for Linux and MacOS too.
"""
if importlib.util.find_spec("jupyter") is not None:
jupyter_cmd = "python3 -m jupyter lab"
if sys.platform == "win32":
jupyter_cmd = "jupyter lab"
working_dir = os.getcwd()
os.chdir(PRIMAITE_PATHS.user_notebooks_path)
subprocess.Popen(jupyter_cmd)
os.chdir(working_dir)
else:
# Jupyter is not installed
_LOGGER.error("Cannot start jupyter lab as it is not installed")

View File

@@ -1,35 +1,46 @@
# © Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
import filecmp
import os
import shutil
from logging import Logger
from pathlib import Path
import pkg_resources
from primaite import getLogger, PRIMAITE_PATHS
_LOGGER: Logger = getLogger(__name__)
def should_copy_file(src: Path, dest: Path, overwrite_existing: bool) -> bool:
"""
Determine if the file should be copied.
:param src: The source file Path.
:param dest: The destination file Path.
:param overwrite_existing: A bool to toggle replacing existing edited files on or off.
:return: True if file should be copied, otherwise False.
"""
if not dest.is_file():
return True
if overwrite_existing and not filecmp.cmp(src, dest):
return True
return False
def run(overwrite_existing: bool = True) -> None:
"""
Resets the demo jupyter notebooks in the users app notebooks directory.
Resets the demo Jupyter notebooks in the user's app notebooks directory.
:param overwrite_existing: A bool to toggle replacing existing edited notebooks on or off.
"""
notebooks_package_data_root = pkg_resources.resource_filename("primaite", "notebooks/_package_data")
for subdir, dirs, files in os.walk(notebooks_package_data_root):
for file in files:
fp = os.path.join(subdir, file)
path_split = os.path.relpath(fp, notebooks_package_data_root).split(os.sep)
target_fp = PRIMAITE_PATHS.user_notebooks_path / Path(*path_split)
target_fp.parent.mkdir(exist_ok=True, parents=True)
copy_file = not target_fp.is_file()
primaite_root = Path(__file__).parent.parent
example_notebooks_user_dir = PRIMAITE_PATHS.user_notebooks_path / "example_notebooks"
example_notebooks_user_dir.mkdir(exist_ok=True, parents=True)
if overwrite_existing and not copy_file:
copy_file = (not filecmp.cmp(fp, target_fp)) and (".ipynb_checkpoints" not in str(target_fp))
for src_fp in primaite_root.glob("**/*.ipynb"):
dst_fp = example_notebooks_user_dir / src_fp.name
if copy_file:
shutil.copy2(fp, target_fp)
_LOGGER.info(f"Reset example notebook: {target_fp}")
if should_copy_file(src_fp, dst_fp, overwrite_existing):
print(dst_fp)
shutil.copy2(src_fp, dst_fp)
_LOGGER.info(f"Reset example notebook: {dst_fp}")

View File

@@ -0,0 +1,688 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "03b2013a-b7d1-47ee-b08c-8dab83833720",
"metadata": {},
"source": [
"# PrimAITE Router Simulation Demo\n",
"\n",
"This demo uses the ARCD Use Case 2 Network (seen below) to demonstrate the capabilities of the Network simulator in PrimAITE."
]
},
{
"cell_type": "raw",
"id": "c8bb5698-e746-4e90-9c2f-efe962acdfa0",
"metadata": {},
"source": [
" +------------+\n",
" | domain_ |\n",
" +------------+ controller |\n",
" | | |\n",
" | +------------+\n",
" |\n",
" |\n",
"+------------+ | +------------+\n",
"| | | | |\n",
"| client_1 +---------+ | +---------+ web_server |\n",
"| | | | | | |\n",
"+------------+ | | | +------------+\n",
" +--+---------+ +------------+ +------+--+--+\n",
" | | | | | |\n",
" | switch_2 +------+ router_1 +------+ switch_1 |\n",
" | | | | | |\n",
" +--+------+--+ +------------+ +--+---+--+--+\n",
"+------------+ | | | | | +------------+\n",
"| | | | | | | | database |\n",
"| client_2 +---------+ | | | +---------+ _server |\n",
"| | | | | | |\n",
"+------------+ | | | +------------+\n",
" | +------------+ | |\n",
" | | security | | |\n",
" +---------+ _suite +---------+ | +------------+\n",
" | | | | backup_ |\n",
" +------------+ +------------+ server |\n",
" | |\n",
" +------------+"
]
},
{
"cell_type": "markdown",
"id": "415d487c-6457-497d-85d6-99439b3541e7",
"metadata": {},
"source": [
"## The Network\n",
"First let's create our network. The network comes 'pre-packaged' with PrimAITE in the `primaite.simulator.network.networks` module.\n",
"\n",
"> You'll see a bunch of logs associated with parts of the Network that aern't an 'electronic' device on the Network and thus don't have a stsrem to log to. Soon these logs are going to be pushed to a Network Logger so we're not clogging up the PrimAITE application logs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de57ac8c-5b28-4847-a759-2ceaf5593329",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from primaite.simulator.network.networks import arcd_uc2_network"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a1e2e4df-67c0-4584-ab27-47e2c7c7fcd2",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network = arcd_uc2_network()"
]
},
{
"cell_type": "markdown",
"id": "fb052c56-e9ca-4093-9115-d0c440b5ff53",
"metadata": {},
"source": [
"Most of the Network components have a `.show()` function that prints a table of information about that object. We can view the Nodes and Links on the Network by calling `network.show()`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc199741-ef2e-47f5-b2f0-e20049ccf40f",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.show()"
]
},
{
"cell_type": "markdown",
"id": "76d2b7e9-280b-4741-a8b3-a84bed219fac",
"metadata": {
"tags": []
},
"source": [
"## Nodes\n",
"\n",
"Now let's inspect some of the nodes. We can directly access a node on the Network by calling .`get_node_by_hostname`. Like Network, a Node, along with some core services like ARP, have a `.show()` method."
]
},
{
"cell_type": "markdown",
"id": "84113002-843e-4cab-b899-667b50f25f6b",
"metadata": {},
"source": [
"### Router Nodes\n",
"\n",
"First we'll inspect the Router node and some of it's core services."
]
},
{
"cell_type": "markdown",
"id": "bf63a178-eee5-4669-bf64-13aea7ecf6cb",
"metadata": {},
"source": [
"Calling `router.show()` displays the Ethernet interfaces on the Router. If you need a table in markdown format, pass `markdown=True`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e76d1854-961e-438c-b40f-77fd9c3abe38",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").show()"
]
},
{
"cell_type": "markdown",
"id": "e000540c-687c-4254-870c-1d814603bdbf",
"metadata": {},
"source": [
"Calling `router.arp.show()` displays the Router ARP Cache."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "92de8b42-92d7-4934-9c12-50bf724c9eb2",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").arp.show()"
]
},
{
"cell_type": "markdown",
"id": "a9ff7ee8-9482-44de-9039-b684866bdc82",
"metadata": {},
"source": [
"Calling `router.acl.show()` displays the Access Control List."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5922282a-d22b-4e55-9176-f3f3654c849f",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").acl.show()"
]
},
{
"cell_type": "markdown",
"id": "71c87884-f793-4c9f-b004-5b0df86cf585",
"metadata": {},
"source": [
"Calling `router.router_table.show()` displays the static routes the Router provides."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "327203be-f475-4727-82a1-e992d3b70ed8",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").route_table.show()"
]
},
{
"cell_type": "markdown",
"id": "eef561a8-3d39-4c8b-bbc8-e8b10b8ed25f",
"metadata": {},
"source": [
"Calling `router.sys_log.show()` displays the Router system log. By default, only the last 10 log entries are displayed, this can be changed by passing `last_n=<number of log entries>`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3d0aa004-b10c-445f-aaab-340e0e716c74",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").sys_log.show(last_n=10)"
]
},
{
"cell_type": "markdown",
"id": "25630c90-c54e-4b5d-8bf4-ad1b0722e126",
"metadata": {},
"source": [
"### Switch Nodes\n",
"\n",
"Next we'll inspect the Switch node and some of it's core services."
]
},
{
"cell_type": "markdown",
"id": "4879394d-2981-40de-a229-e19b09a34e6e",
"metadata": {},
"source": [
"Calling `switch.show()` displays the Switch orts on the Switch."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e7fd439b-5442-4e9d-9e7d-86dacb77f458",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"switch_1\").show()"
]
},
{
"cell_type": "markdown",
"id": "beb8dbd6-7250-4ac9-9fa2-d2a9c0e5fd19",
"metadata": {
"tags": []
},
"source": [
"Calling `switch.arp.show()` displays the Switch ARP Cache."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d06e1310-4a77-4315-a59f-cb1b49ca2352",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"switch_1\").arp.show()"
]
},
{
"cell_type": "markdown",
"id": "fda75ac3-8123-4234-8f36-86547891d8df",
"metadata": {},
"source": [
"Calling `switch.sys_log.show()` displays the Switch system log. By default, only the last 10 log entries are displayed, this can be changed by passing `last_n=<number of log entries>`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a0d984b7-a7c1-4bbd-aa5a-9d3caecb08dc",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"switch_1\").sys_log.show()"
]
},
{
"cell_type": "markdown",
"id": "2f1d99ad-db4f-4baf-8a35-e1d95f269586",
"metadata": {},
"source": [
"### Computer/Server Nodes\n",
"\n",
"Finally, we'll inspect a Computer or Server Node and some of its core services."
]
},
{
"cell_type": "markdown",
"id": "c9e2251a-1b47-46e5-840f-7fec3e39c5aa",
"metadata": {
"tags": []
},
"source": [
"Calling `computer.show()` displays the NICs on the Computer/Server."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "656c37f6-b145-42af-9714-8d2886d0eff8",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"security_suite\").show()"
]
},
{
"cell_type": "markdown",
"id": "f1097a49-a3da-4d79-a06d-ae8af452918f",
"metadata": {},
"source": [
"Calling `computer.arp.show()` displays the Computer/Server ARP Cache."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "66b267d6-2308-486a-b9aa-cb8d3bcf0753",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"security_suite\").arp.show()"
]
},
{
"cell_type": "markdown",
"id": "0d1fcad8-5b1a-4d8b-a49f-aa54a95fcaf0",
"metadata": {},
"source": [
"Calling `switch.sys_log.show()` displays the Computer/Server system log. By default, only the last 10 log entries are displayed, this can be changed by passing `last_n=<number of log entries>`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1b5debe8-ef1b-445d-8fa9-6a45568f21f3",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"security_suite\").sys_log.show()"
]
},
{
"cell_type": "markdown",
"id": "fcfa1773-798c-4ada-9318-c3ad928217da",
"metadata": {},
"source": [
"## Basic Network Comms Check\n",
"\n",
"We can perform a good old ping to check that Nodes are able to communicate with each other."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "495b7de4-b6ce-41a6-9114-f74752ab4491",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.show(nodes=False, links=False)"
]
},
{
"cell_type": "markdown",
"id": "3e13922a-217f-4f4e-99b6-57a07613cade",
"metadata": {},
"source": [
"We'll first ping client_1's default gateway."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a38abb71-994e-49e8-8f51-e9a550e95b99",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_1\").ping(\"192.168.10.1\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8388e1e9-30e3-4534-8e5a-c6e9144149d2",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_1\").sys_log.show(15)"
]
},
{
"cell_type": "markdown",
"id": "02c76d5c-d954-49db-912d-cb9c52f46375",
"metadata": {},
"source": [
"Next, we'll ping the interface of the 192.168.1.0/24 Network on the Router (port 1)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ff8e976a-c16b-470c-8923-325713a30d6c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_1\").ping(\"192.168.1.1\")"
]
},
{
"cell_type": "markdown",
"id": "80280404-a5ab-452f-8a02-771a0d7496b1",
"metadata": {},
"source": [
"And finally, we'll ping the web server."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c4163f8d-6a72-410c-9f5c-4f881b7de45e",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_1\").ping(\"192.168.1.12\")"
]
},
{
"cell_type": "markdown",
"id": "1194c045-ba77-4427-be30-ed7b5b224850",
"metadata": {},
"source": [
"To confirm that the ping was received and processed by the web_server, we can view the sys log"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e79a523a-5780-45b6-8798-c434e0e522bd",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"web_server\").sys_log.show()"
]
},
{
"cell_type": "markdown",
"id": "5928f6dd-1006-45e3-99f3-8f311a875faa",
"metadata": {},
"source": [
"## Advanced Network Usage\n",
"\n",
"We can now use the Network to perform some more advaced things."
]
},
{
"cell_type": "markdown",
"id": "5e023ef3-7d18-4006-96ee-042a06a481fc",
"metadata": {},
"source": [
"Let's attempt to prevent client_2 from being able to ping the web server. First, we'll confirm that it can ping the server first..."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "603cf913-e261-49da-a7dd-85e1bb6dec56",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_2\").ping(\"192.168.1.12\")"
]
},
{
"cell_type": "markdown",
"id": "5cf962a4-20e6-44ae-9748-7fc5267ae111",
"metadata": {},
"source": [
"If we look at the client_2 sys log we can see that the four ICMP echo requests were sent and four ICMP each replies were received:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e047de00-3de4-4823-b26a-2c8d64c7a663",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_2\").sys_log.show()"
]
},
{
"cell_type": "markdown",
"id": "bdc4741d-6e3e-4aec-a69c-c2e9653bd02c",
"metadata": {},
"source": [
"Now we'll add an ACL to block ICMP from 192.168.10.22"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6db355ae-b99a-441b-a2c4-4ffe78f46bff",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from primaite.simulator.network.transmission.network_layer import IPProtocol\n",
"from primaite.simulator.network.transmission.transport_layer import Port\n",
"from primaite.simulator.network.hardware.nodes.router import ACLAction\n",
"network.get_node_by_hostname(\"router_1\").acl.add_rule(\n",
" action=ACLAction.DENY,\n",
" protocol=IPProtocol.ICMP,\n",
" src_ip_address=\"192.168.10.22\",\n",
" position=1\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a345e000-8842-4827-af96-adc0fbe390fb",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").acl.show()"
]
},
{
"cell_type": "markdown",
"id": "3a5bfd9f-04cb-493e-a86c-cd268563a262",
"metadata": {},
"source": [
"Now we attempt (and fail) to ping the web server"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a4f4ff31-590f-40fb-b13d-efaa8c2720b6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_2\").ping(\"192.168.1.12\")"
]
},
{
"cell_type": "markdown",
"id": "83e56497-097b-45cb-964e-b15c72547b38",
"metadata": {},
"source": [
"We can check that the ping was actually sent by client_2 by viewing the sys log"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f62b8a4e-fd3b-4059-b108-3d4a0b18f2a0",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_2\").sys_log.show()"
]
},
{
"cell_type": "markdown",
"id": "c7040311-a879-4620-86a0-55d0774156e5",
"metadata": {},
"source": [
"We can check the router sys log to see why the traffic was blocked"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7e53d776-99da-4d2c-a2a7-bd7ce27bff4c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"router_1\").sys_log.show()"
]
},
{
"cell_type": "markdown",
"id": "aba0bc7d-da57-477b-b34a-3688b5aab2c6",
"metadata": {},
"source": [
"Now a final check to ensure that client_1 can still ping the web_server."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d542734b-7582-4af7-8254-bda3de50d091",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_1\").ping(\"192.168.1.12\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d78e9fe3-02c6-4792-944f-5622e26e0412",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"network.get_node_by_hostname(\"client_1\").sys_log.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, Extra
from pydantic import BaseModel, ConfigDict
from primaite import getLogger
@@ -122,7 +122,7 @@ class ActionManager(BaseModel):
class SimComponent(BaseModel):
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""
model_config = ConfigDict(arbitrary_types_allowed=True, extra=Extra.allow)
model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")
"""Configure pydantic to allow arbitrary types and to let the instance have attributes not present in model."""
uuid: str

View File

@@ -1,5 +1,5 @@
from random import choice
from typing import Dict, Optional, Union
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.core import SimComponent
@@ -211,7 +211,7 @@ class FileSystem(SimComponent):
if file is not None:
return file
def get_folder_by_name(self, folder_name: str) -> Union[FileSystemFolder, None]:
def get_folder_by_name(self, folder_name: str) -> Optional[FileSystemFolder]:
"""
Returns a the first folder with a matching name.

View File

@@ -1,22 +1,46 @@
from typing import Any, Dict, Union
from typing import Any, Dict, List, Optional, Union
import matplotlib.pyplot as plt
import networkx as nx
from networkx import MultiGraph
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
from primaite.simulator.network.hardware.base import Link, NIC, Node, SwitchPort
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
_LOGGER = getLogger(__name__)
class Network(SimComponent):
"""Top level container object representing the physical network."""
"""
Top level container object representing the physical network.
This class manages nodes, links, and other network components. It also
offers methods for rendering the network topology and gathering states.
:ivar Dict[str, Node] nodes: Dictionary mapping node UUIDs to Node instances.
:ivar Dict[str, Link] links: Dictionary mapping link UUIDs to Link instances.
"""
nodes: Dict[str, Node] = {}
links: Dict[str, Link] = {}
def __init__(self, **kwargs):
"""Initialise the network."""
"""
Initialise the network.
Constructs the network and sets up its initial state including
the action manager and an empty MultiGraph for topology representation.
"""
super().__init__(**kwargs)
self._nx_graph = MultiGraph()
def _init_action_manager(self) -> ActionManager:
am = super()._init_action_manager()
@@ -29,14 +53,110 @@ class Network(SimComponent):
)
return am
@property
def routers(self) -> List[Router]:
"""The Routers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Router)]
@property
def switches(self) -> List[Switch]:
"""The Switches in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Switch)]
@property
def computers(self) -> List[Computer]:
"""The Computers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Computer) and not isinstance(node, Server)]
@property
def servers(self) -> List[Server]:
"""The Servers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Server)]
def show(self, nodes: bool = True, ip_addresses: bool = True, links: bool = True, markdown: bool = False):
"""
Print tables describing the Network.
Generate and print PrettyTable instances that show details about nodes,
IP addresses, and links in the network. Output can be in Markdown format.
:param nodes: Include node details in the output. Defaults to True.
:param ip_addresses: Include IP address details in the output. Defaults to True.
:param links: Include link details in the output. Defaults to True.
:param markdown: Use Markdown style in table output. Defaults to False.
"""
nodes_type_map = {
"Router": self.routers,
"Switch": self.switches,
"Server": self.servers,
"Computer": self.computers,
}
if nodes:
table = PrettyTable(["Node", "Type", "Operating State"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "Nodes"
for node_type, nodes in nodes_type_map.items():
for node in nodes:
table.add_row([node.hostname, node_type, node.operating_state.name])
print(table)
if ip_addresses:
table = PrettyTable(["Node", "Port", "IP Address", "Subnet Mask", "Default Gateway"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "IP Addresses"
for nodes in nodes_type_map.values():
for node in nodes:
for i, port in node.ethernet_port.items():
table.add_row([node.hostname, i, port.ip_address, port.subnet_mask, node.default_gateway])
print(table)
if links:
table = PrettyTable(["Endpoint A", "Endpoint B", "is Up", "Bandwidth (MBits)", "Current Load"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = "Links"
links = list(self.links.values())
for nodes in nodes_type_map.values():
for node in nodes:
for link in links[::-1]:
if node in [link.endpoint_a.parent, link.endpoint_b.parent]:
table.add_row(
[
link.endpoint_a.parent.hostname,
link.endpoint_b.parent.hostname,
link.is_up,
link.bandwidth,
link.current_load_percent,
]
)
links.remove(link)
print(table)
def clear_links(self):
"""Clear all the links in the network by resetting their component state for the episode."""
for link in self.links.values():
link.reset_component_for_episode()
def draw(self, seed: int = 123):
"""
Draw the Network using NetworkX and matplotlib.pyplot.
:param seed: An integer seed for reproducible layouts. Default is 123.
"""
pos = nx.spring_layout(self._nx_graph, seed=seed)
nx.draw(self._nx_graph, pos, with_labels=True)
plt.show()
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Produce a dictionary describing the current state of the Network.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
:return: A dictionary capturing the current state of the Network and its child objects.
"""
state = super().describe_state()
state.update(
@@ -51,20 +171,37 @@ class Network(SimComponent):
"""
Add an existing node to the network.
:param node: Node instance that the network should keep track of.
:type node: Node
.. note:: If the node is already present in the network, a warning is logged.
:param node: Node instance that should be kept track of by the network.
"""
if node in self:
_LOGGER.warning(f"Can't add node {node.uuid}. It is already in the network.")
return
self.nodes[node.uuid] = node
node.parent = self
self._nx_graph.add_node(node.hostname)
_LOGGER.info(f"Added node {node.uuid} to Network {self.uuid}")
def get_node_by_hostname(self, hostname: str) -> Optional[Node]:
"""
Get a Node from the Network by its hostname.
.. note:: Assumes hostnames on the network are unique.
:param hostname: The Node hostname.
:return: The Node if it exists in the network.
"""
for node in self.nodes.values():
if node.hostname == hostname:
return node
def remove_node(self, node: Node) -> None:
"""
Remove a node from the network.
.. note:: If the node is not found in the network, a warning is logged.
:param node: Node instance that is currently part of the network that should be removed.
:type node: Node
"""
@@ -76,26 +213,29 @@ class Network(SimComponent):
_LOGGER.info(f"Removed node {node.uuid} from network {self.uuid}")
def connect(self, endpoint_a: Union[NIC, SwitchPort], endpoint_b: Union[NIC, SwitchPort], **kwargs) -> None:
"""Connect two nodes on the network by creating a link between an NIC/SwitchPort of each one.
:param endpoint_a: The endpoint to which to connect the link on the first node
:type endpoint_a: Union[NIC, SwitchPort]
:param endpoint_b: The endpoint to which to connct the link on the second node
:type endpoint_b: Union[NIC, SwitchPort]
:raises RuntimeError: _description_
"""
node_a = endpoint_a.parent
node_b = endpoint_b.parent
Connect two endpoints on the network by creating a link between their NICs/SwitchPorts.
.. note:: If the nodes owning the endpoints are not already in the network, they are automatically added.
:param endpoint_a: The first endpoint to connect.
:type endpoint_a: Union[NIC, SwitchPort]
:param endpoint_b: The second endpoint to connect.
:type endpoint_b: Union[NIC, SwitchPort]
:raises RuntimeError: If any validation or runtime checks fail.
"""
node_a: Node = endpoint_a.parent
node_b: Node = endpoint_b.parent
if node_a not in self:
self.add_node(node_a)
if node_b not in self:
self.add_node(node_b)
if node_a is node_b:
_LOGGER.warn(f"Cannot link endpoint {endpoint_a} to {endpoint_b} because they belong to the same node.")
_LOGGER.warning(f"Cannot link endpoint {endpoint_a} to {endpoint_b} because they belong to the same node.")
return
link = Link(endpoint_a=endpoint_a, endpoint_b=endpoint_b, **kwargs)
self.links[link.uuid] = link
self._nx_graph.add_edge(endpoint_a.parent.hostname, endpoint_b.parent.hostname)
link.parent = self
_LOGGER.info(f"Added link {link.uuid} to connect {endpoint_a} and {endpoint_b}")

View File

@@ -6,7 +6,7 @@ from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from typing import Any, Dict, List, Optional, Tuple, Union
from prettytable import PrettyTable
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
@@ -77,12 +77,10 @@ class NIC(SimComponent):
ip_address: IPv4Address
"The IP address assigned to the NIC for communication on an IP-based network."
subnet_mask: str
subnet_mask: IPv4Address
"The subnet mask assigned to the NIC."
gateway: IPv4Address
"The default gateway IP address for forwarding network traffic to other networks. Randomly generated upon creation."
mac_address: str
"The MAC address of the NIC. Defaults to a randomly set MAC address."
"The MAC address of the NIC. Defaults to a randomly set MAC address. Randomly generated upon creation."
speed: int = 100
"The speed of the NIC in Mbps. Default is 100 Mbps."
mtu: int = 1500
@@ -111,16 +109,10 @@ class NIC(SimComponent):
"""
if not isinstance(kwargs["ip_address"], IPv4Address):
kwargs["ip_address"] = IPv4Address(kwargs["ip_address"])
if not isinstance(kwargs["gateway"], IPv4Address):
kwargs["gateway"] = IPv4Address(kwargs["gateway"])
if "mac_address" not in kwargs:
kwargs["mac_address"] = generate_mac_address()
super().__init__(**kwargs)
if self.ip_address == self.gateway:
msg = f"NIC ip address {self.ip_address} cannot be the same as the gateway {self.gateway}"
_LOGGER.error(msg)
raise ValueError(msg)
if self.ip_network.network_address == self.ip_address:
msg = (
f"Failed to set IP address {self.ip_address} and subnet mask {self.subnet_mask} as it is a "
@@ -173,6 +165,9 @@ class NIC(SimComponent):
if self.connected_node.operating_state != NodeOperatingState.ON:
self.connected_node.sys_log.error(f"NIC {self} cannot be enabled as the endpoint is not turned on")
return
if not self.connected_link:
_LOGGER.error(f"NIC {self} cannot be enabled as it is not connected to a Link")
return
self.enabled = True
self.connected_node.sys_log.info(f"NIC {self} enabled")
@@ -210,6 +205,7 @@ class NIC(SimComponent):
# TODO: Inform the Node that a link has been connected
self.connected_link = link
self.enable()
_LOGGER.info(f"NIC {self} connected to Link {link}")
def disconnect_link(self):
@@ -260,14 +256,15 @@ class NIC(SimComponent):
The Frame is passed to the Node.
:param frame: The network frame being received.
:type frame: :class:`~primaite.simulator.network.osi_layers.Frame`
"""
if self.enabled:
frame.decrement_ttl()
frame.set_received_timestamp()
self.pcap.capture(frame)
self.connected_node.receive_frame(frame=frame, from_nic=self)
return True
# If this destination or is broadcast
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
self.connected_node.receive_frame(frame=frame, from_nic=self)
return True
return False
def __str__(self) -> str:
@@ -291,7 +288,7 @@ class SwitchPort(SimComponent):
"The speed of the SwitchPort in Mbps. Default is 100 Mbps."
mtu: int = 1500
"The Maximum Transmission Unit (MTU) of the SwitchPort in Bytes. Default is 1500 B"
connected_node: Optional[Switch] = None
connected_node: Optional[Node] = None
"The Node to which the SwitchPort is connected."
connected_link: Optional[Link] = None
"The Link to which the SwitchPort is connected."
@@ -561,17 +558,43 @@ class ARPCache:
self.arp: Dict[IPv4Address, ARPEntry] = {}
self.nics: Dict[str, "NIC"] = {}
def _add_arp_cache_entry(self, ip_address: IPv4Address, mac_address: str, nic: NIC):
def show(self, markdown: bool = False):
"""Prints a table of ARC Cache."""
table = PrettyTable(["IP Address", "MAC Address", "Via"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} ARP Cache"
for ip, arp in self.arp.items():
table.add_row(
[
str(ip),
arp.mac_address,
self.nics[arp.nic_uuid].mac_address,
]
)
print(table)
def add_arp_cache_entry(self, ip_address: IPv4Address, mac_address: str, nic: NIC, override: bool = False):
"""
Add an ARP entry to the cache.
If an entry for the given IP address already exists, the entry is only updated if the `override` parameter is
set to True.
:param ip_address: The IP address to be added to the cache.
:param mac_address: The MAC address associated with the IP address.
:param nic: The NIC through which the NIC with the IP address is reachable.
:param override: If True, an existing entry for the IP address will be overridden. Default is False.
"""
self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}")
arp_entry = ARPEntry(mac_address=mac_address, nic_uuid=nic.uuid)
self.arp[ip_address] = arp_entry
for _nic in self.nics.values():
if _nic.ip_address == ip_address:
return
if override or not self.arp.get(ip_address):
self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}")
arp_entry = ARPEntry(mac_address=mac_address, nic_uuid=nic.uuid)
self.arp[ip_address] = arp_entry
def _remove_arp_cache_entry(self, ip_address: IPv4Address):
"""
@@ -601,6 +624,7 @@ class ARPCache:
:return: The NIC associated with the IP address, or None if not found.
"""
arp_entry = self.arp.get(ip_address)
if arp_entry:
return self.nics[arp_entry.nic_uuid]
@@ -624,17 +648,42 @@ class ARPCache:
# Network Layer
ip_packet = IPPacket(
src_ip=nic.ip_address,
dst_ip=target_ip_address,
src_ip_address=nic.ip_address,
dst_ip_address=target_ip_address,
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=nic.mac_address, dst_mac_addr="ff:ff:ff:ff:ff:ff")
arp_packet = ARPPacket(
sender_ip=nic.ip_address, sender_mac_addr=nic.mac_address, target_ip=target_ip_address
sender_ip_address=nic.ip_address,
sender_mac_addr=nic.mac_address,
target_ip_address=target_ip_address,
)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet)
nic.send_frame(frame)
def send_arp_reply(self, arp_reply: ARPPacket, from_nic: NIC):
"""
Send an ARP reply back through the NIC it came from.
:param arp_reply: The ARP reply to send.
:param from_nic: The NIC to send the ARP reply from.
"""
self.sys_log.info(
f"Sending ARP reply from {arp_reply.sender_mac_addr}/{arp_reply.sender_ip_address} "
f"to {arp_reply.target_ip_address}/{arp_reply.target_mac_addr} "
)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
ip_packet = IPPacket(
src_ip_address=arp_reply.sender_ip_address,
dst_ip_address=arp_reply.target_ip_address,
)
ethernet_header = EthernetHeader(src_mac_addr=arp_reply.sender_mac_addr, dst_mac_addr=arp_reply.target_mac_addr)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_reply)
from_nic.send_frame(frame)
def process_arp_packet(self, from_nic: NIC, arp_packet: ARPPacket):
"""
Process a received ARP packet, handling both ARP requests and responses.
@@ -648,45 +697,34 @@ class ARPCache:
# ARP Reply
if not arp_packet.request:
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip} from {arp_packet.sender_mac_addr} via NIC {from_nic}"
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self._add_arp_cache_entry(
ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
return
# ARP Request
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip} "
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Unmatched ARP Request
if arp_packet.target_ip != from_nic.ip_address:
self.sys_log.info(f"Ignoring ARP request for {arp_packet.target_ip}")
if arp_packet.target_ip_address != from_nic.ip_address:
self.sys_log.info(f"Ignoring ARP request for {arp_packet.target_ip_address}")
return
# Matched ARP request
self._add_arp_cache_entry(ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.sys_log.info(
f"Sending ARP reply from {arp_packet.sender_mac_addr}/{arp_packet.sender_ip} "
f"to {arp_packet.target_ip}/{arp_packet.target_mac_addr} "
)
self.send_arp_reply(arp_packet, from_nic)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(
src_ip=arp_packet.sender_ip,
dst_ip=arp_packet.target_ip,
)
# Data Link Layer
ethernet_header = EthernetHeader(
src_mac_addr=arp_packet.sender_mac_addr, dst_mac_addr=arp_packet.target_mac_addr
)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet)
from_nic.send_frame(frame)
def __contains__(self, item: Any) -> bool:
return item in self.arp
class ICMP:
@@ -705,21 +743,30 @@ class ICMP:
"""
self.sys_log: SysLog = sys_log
self.arp: ARPCache = arp_cache
self.request_replies = {}
def process_icmp(self, frame: Frame):
def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False):
"""
Process an ICMP packet, including handling echo requests and replies.
:param frame: The Frame containing the ICMP packet to process.
"""
if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
self.sys_log.info(f"Received echo request from {frame.ip.src_ip}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip)
if not is_reattempt:
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address)
if not src_nic:
self.arp.send_arp_request(frame.ip.src_ip_address)
self.process_icmp(frame=frame, from_nic=from_nic, is_reattempt=True)
return
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(src_ip=src_nic.ip_address, dst_ip=frame.ip.src_ip, protocol=IPProtocol.ICMP)
ip_packet = IPPacket(
src_ip_address=src_nic.ip_address, dst_ip_address=frame.ip.src_ip_address, protocol=IPProtocol.ICMP
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_reply_packet = ICMPPacket(
@@ -728,14 +775,28 @@ class ICMP:
identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1,
)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}")
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(
ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet, payload=payload
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}")
src_nic.send_frame(frame)
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
self.sys_log.info(f"Received echo reply from {frame.ip.src_ip}")
time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
f"Reply from {frame.ip.src_ip_address}: "
f"bytes={len(frame.payload)}, "
f"time={time_str}, "
f"TTL={frame.ip.ttl}"
)
if not self.request_replies.get(frame.icmp.identifier):
self.request_replies[frame.icmp.identifier] = 0
self.request_replies[frame.icmp.identifier] += 1
def ping(
self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None
self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None, pings: int = 4
) -> Tuple[int, Union[int, None]]:
"""
Send an ICMP echo request (ping) to a target IP address and manage the sequence and identifier.
@@ -747,13 +808,21 @@ class ICMP:
was not found in the ARP cache.
"""
nic = self.arp.get_arp_cache_nic(target_ip_address)
# TODO: Eventually this ARP request needs to be done elsewhere. It's not the resonsibility of the
# TODO: Eventually this ARP request needs to be done elsewhere. It's not the responsibility of the
# ping function to handle ARP lookups
# Already tried once and cannot get ARP entry, stop trying
if sequence == -1:
if not nic:
return 4, None
else:
sequence = 0
# No existing ARP entry
if not nic:
self.sys_log.info(f"No entry in ARP cache for {target_ip_address}")
self.arp.send_arp_request(target_ip_address)
return 0, None
return -1, None
# ARP entry exists
sequence += 1
@@ -763,15 +832,15 @@ class ICMP:
# Network Layer
ip_packet = IPPacket(
src_ip=nic.ip_address,
dst_ip=target_ip_address,
src_ip_address=nic.ip_address,
dst_ip_address=target_ip_address,
protocol=IPProtocol.ICMP,
)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet)
self.sys_log.info(f"Sending echo request to {target_ip_address}")
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet, payload=payload)
nic.send_frame(frame)
return sequence, icmp_packet.identifier
@@ -802,10 +871,14 @@ class Node(SimComponent):
hostname: str
"The node hostname on the network."
default_gateway: Optional[IPv4Address] = None
"The default gateway IP address for forwarding network traffic to other networks."
operating_state: NodeOperatingState = NodeOperatingState.OFF
"The hardware state of the node."
nics: Dict[str, NIC] = {}
"The NICs on the node."
ethernet_port: Dict[int, NIC] = {}
"The NICs on the node by port id."
accounts: Dict[str, Account] = {}
"All accounts on the node."
@@ -833,9 +906,12 @@ class Node(SimComponent):
This method initializes the ARP cache, ICMP handler, session manager, and software manager if they are not
provided.
"""
if kwargs.get("default_gateway"):
if not isinstance(kwargs["default_gateway"], IPv4Address):
kwargs["default_gateway"] = IPv4Address(kwargs["default_gateway"])
if not kwargs.get("sys_log"):
kwargs["sys_log"] = SysLog(kwargs["hostname"])
if not kwargs.get("arp_cache"):
if not kwargs.get("arp"):
kwargs["arp"] = ARPCache(sys_log=kwargs.get("sys_log"))
if not kwargs.get("icmp"):
kwargs["icmp"] = ICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"))
@@ -874,18 +950,19 @@ class Node(SimComponent):
)
return state
def show(self):
"""Prints a table of the NICs on the Node.."""
from prettytable import PrettyTable
table = PrettyTable(["MAC Address", "Address", "Default Gateway", "Speed", "Status"])
for nic in self.nics.values():
def show(self, markdown: bool = False):
"""Prints a table of the NICs on the Node."""
table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Network Interface Cards"
for port, nic in self.ethernet_port.items():
table.add_row(
[
port,
nic.mac_address,
f"{nic.ip_address}/{nic.ip_network.prefixlen}",
nic.gateway,
nic.speed,
"Enabled" if nic.enabled else "Disabled",
]
@@ -898,7 +975,8 @@ class Node(SimComponent):
self.operating_state = NodeOperatingState.ON
self.sys_log.info("Turned on")
for nic in self.nics.values():
nic.enable()
if nic.connected_link:
nic.enable()
def power_off(self):
"""Power off the Node, disabling its NICs if it is in the ON state."""
@@ -917,6 +995,7 @@ class Node(SimComponent):
"""
if nic.uuid not in self.nics:
self.nics[nic.uuid] = nic
self.ethernet_port[len(self.nics)] = nic
nic.connected_node = self
nic.parent = self
self.sys_log.info(f"Connected NIC {nic}")
@@ -938,6 +1017,10 @@ class Node(SimComponent):
if isinstance(nic, str):
nic = self.nics.get(nic)
if nic or nic.uuid in self.nics:
for port, _nic in self.ethernet_port.items():
if nic == _nic:
self.ethernet_port.pop(port)
break
self.nics.pop(nic.uuid)
nic.parent = None
nic.disable()
@@ -958,13 +1041,27 @@ class Node(SimComponent):
"""
if not isinstance(target_ip_address, IPv4Address):
target_ip_address = IPv4Address(target_ip_address)
if target_ip_address.is_loopback:
self.sys_log.info("Pinging loopback address")
return any(nic.enabled for nic in self.nics.values())
if self.operating_state == NodeOperatingState.ON:
self.sys_log.info(f"Attempting to ping {target_ip_address}")
self.sys_log.info(f"Pinging {target_ip_address}:")
sequence, identifier = 0, None
while sequence < pings:
sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier)
return True
self.sys_log.info("Ping failed as the node is turned off")
sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier, pings)
request_replies = self.icmp.request_replies.get(identifier)
passed = request_replies == pings
if request_replies:
self.icmp.request_replies.pop(identifier)
else:
request_replies = 0
self.sys_log.info(
f"Ping statistics for {target_ip_address}: "
f"Packets: Sent = {pings}, "
f"Received = {request_replies}, "
f"Lost = {pings-request_replies} ({(pings-request_replies)/pings*100}% loss)"
)
return passed
return False
def send_frame(self, frame: Frame):
@@ -973,7 +1070,7 @@ class Node(SimComponent):
:param frame: The Frame to be sent.
"""
nic: NIC = self._get_arp_cache_nic(frame.ip.dst_ip)
nic: NIC = self._get_arp_cache_nic(frame.ip.dst_ip_address)
nic.send_frame(frame)
def receive_frame(self, frame: Frame, from_nic: NIC):
@@ -986,13 +1083,18 @@ class Node(SimComponent):
:param frame: The Frame being received.
:param from_nic: The NIC that received the frame.
"""
if frame.ip:
if frame.ip.src_ip_address in self.arp:
self.arp.add_arp_cache_entry(
ip_address=frame.ip.src_ip_address, mac_address=frame.ethernet.src_mac_addr, nic=from_nic
)
if frame.ip.protocol == IPProtocol.TCP:
if frame.tcp.src_port == Port.ARP:
self.arp.process_arp_packet(from_nic=from_nic, arp_packet=frame.arp)
elif frame.ip.protocol == IPProtocol.UDP:
pass
elif frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame)
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
def install_service(self, service: Service) -> None:
"""

View File

@@ -0,0 +1,38 @@
from primaite.simulator.network.hardware.base import NIC, Node
class Computer(Node):
"""
A basic Computer class.
Example:
>>> pc_a = Computer(
hostname="pc_a",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
)
>>> pc_a.power_on()
Instances of computer come 'pre-packaged' with the following:
* Core Functionality:
* ARP
* ICMP
* Packet Capture
* Sys Log
* Services:
* DNS Client
* FTP Client
* LDAP Client
* NTP Client
* Applications:
* Email Client
* Web Browser
* Processes:
* Placeholder
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=kwargs["ip_address"], subnet_mask=kwargs["subnet_mask"]))

View File

@@ -0,0 +1,759 @@
from __future__ import annotations
import secrets
from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from typing import Dict, List, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import SimComponent
from primaite.simulator.network.hardware.base import ARPCache, ICMP, NIC, Node
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
from primaite.simulator.system.core.sys_log import SysLog
class ACLAction(Enum):
"""Enum for defining the ACL action types."""
DENY = 0
PERMIT = 1
class ACLRule(SimComponent):
"""
Represents an Access Control List (ACL) rule.
:ivar ACLAction action: Action to be performed (Permit/Deny). Default is DENY.
:ivar Optional[IPProtocol] protocol: Network protocol. Default is None.
:ivar Optional[IPv4Address] src_ip_address: Source IP address. Default is None.
:ivar Optional[Port] src_port: Source port number. Default is None.
:ivar Optional[IPv4Address] dst_ip_address: Destination IP address. Default is None.
:ivar Optional[Port] dst_port: Destination port number. Default is None.
"""
action: ACLAction = ACLAction.DENY
protocol: Optional[IPProtocol] = None
src_ip_address: Optional[IPv4Address] = None
src_port: Optional[Port] = None
dst_ip_address: Optional[IPv4Address] = None
dst_port: Optional[Port] = None
def __str__(self) -> str:
rule_strings = []
for key, value in self.model_dump(exclude={"uuid", "action_manager"}).items():
if value is None:
value = "ANY"
if isinstance(value, Enum):
rule_strings.append(f"{key}={value.name}")
else:
rule_strings.append(f"{key}={value}")
return ", ".join(rule_strings)
def describe_state(self) -> Dict:
"""
Describes the current state of the ACLRule.
:return: A dictionary representing the current state.
"""
pass
class AccessControlList(SimComponent):
"""
Manages a list of ACLRules to filter network traffic.
:ivar SysLog sys_log: System logging instance.
:ivar ACLAction implicit_action: Default action for rules.
:ivar ACLRule implicit_rule: Implicit ACL rule, created based on implicit_action.
:ivar int max_acl_rules: Maximum number of ACL rules that can be added. Default is 25.
:ivar List[Optional[ACLRule]] _acl: A list containing the ACL rules.
"""
sys_log: SysLog
implicit_action: ACLAction
implicit_rule: ACLRule
max_acl_rules: int = 25
_acl: List[Optional[ACLRule]] = [None] * 24
def __init__(self, **kwargs) -> None:
if not kwargs.get("implicit_action"):
kwargs["implicit_action"] = ACLAction.DENY
kwargs["implicit_rule"] = ACLRule(action=kwargs["implicit_action"])
super().__init__(**kwargs)
self._acl = [None] * (self.max_acl_rules - 1)
def describe_state(self) -> Dict:
"""
Describes the current state of the AccessControlList.
:return: A dictionary representing the current state.
"""
pass
@property
def acl(self) -> List[Optional[ACLRule]]:
"""
Get the list of ACL rules.
:return: The list of ACL rules.
"""
return self._acl
def add_rule(
self,
action: ACLAction,
protocol: Optional[IPProtocol] = None,
src_ip_address: Optional[Union[str, IPv4Address]] = None,
src_port: Optional[Port] = None,
dst_ip_address: Optional[Union[str, IPv4Address]] = None,
dst_port: Optional[Port] = None,
position: int = 0,
) -> None:
"""
Add a new ACL rule.
:param ACLAction action: Action to be performed (Permit/Deny).
:param Optional[IPProtocol] protocol: Network protocol.
:param Optional[Union[str, IPv4Address]] src_ip_address: Source IP address.
:param Optional[Port] src_port: Source port number.
:param Optional[Union[str, IPv4Address]] dst_ip_address: Destination IP address.
:param Optional[Port] dst_port: Destination port number.
:param int position: Position in the ACL list to insert the rule.
:raises ValueError: When the position is out of bounds.
"""
if isinstance(src_ip_address, str):
src_ip_address = IPv4Address(src_ip_address)
if isinstance(dst_ip_address, str):
dst_ip_address = IPv4Address(dst_ip_address)
if 0 <= position < self.max_acl_rules:
if self._acl[position]:
self.sys_log.info(f"Overwriting ACL rule at position {position}")
self._acl[position] = ACLRule(
action=action,
src_ip_address=src_ip_address,
dst_ip_address=dst_ip_address,
protocol=protocol,
src_port=src_port,
dst_port=dst_port,
)
else:
raise ValueError(f"Cannot add ACL rule, position {position} is out of bounds.")
def remove_rule(self, position: int) -> None:
"""
Remove an ACL rule from a specific position.
:param int position: The position of the rule to be removed.
:raises ValueError: When the position is out of bounds.
"""
if 0 <= position < self.max_acl_rules - 1:
rule = self._acl[position] # noqa
self._acl[position] = None
del rule
else:
raise ValueError(f"Cannot remove ACL rule, position {position} is out of bounds.")
def is_permitted(
self,
protocol: IPProtocol,
src_ip_address: Union[str, IPv4Address],
src_port: Optional[Port],
dst_ip_address: Union[str, IPv4Address],
dst_port: Optional[Port],
) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
"""
Check if a packet with the given properties is permitted through the ACL.
:param protocol: The protocol of the packet.
:param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address.
:param src_port: Source port of the packet. Optional.
:param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_port: Destination port of the packet. Optional.
:return: A tuple with a boolean indicating if the packet is permitted and an optional rule or implicit action
string.
"""
if not isinstance(src_ip_address, IPv4Address):
src_ip_address = IPv4Address(src_ip_address)
if not isinstance(dst_ip_address, IPv4Address):
dst_ip_address = IPv4Address(dst_ip_address)
for rule in self._acl:
if not rule:
continue
if (
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
and (rule.protocol == protocol or rule.protocol is None)
and (rule.src_port == src_port or rule.src_port is None)
and (rule.dst_port == dst_port or rule.dst_port is None)
):
return rule.action == ACLAction.PERMIT, rule
return self.implicit_action == ACLAction.PERMIT, f"Implicit {self.implicit_action.name}"
def get_relevant_rules(
self,
protocol: IPProtocol,
src_ip_address: Union[str, IPv4Address],
src_port: Port,
dst_ip_address: Union[str, IPv4Address],
dst_port: Port,
) -> List[ACLRule]:
"""
Get the list of relevant rules for a packet with given properties.
:param protocol: The protocol of the packet.
:param src_ip_address: Source IP address of the packet. Accepts string and IPv4Address.
:param src_port: Source port of the packet.
:param dst_ip_address: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_port: Destination port of the packet.
:return: A list of relevant ACLRules.
"""
if not isinstance(src_ip_address, IPv4Address):
src_ip_address = IPv4Address(src_ip_address)
if not isinstance(dst_ip_address, IPv4Address):
dst_ip_address = IPv4Address(dst_ip_address)
relevant_rules = []
for rule in self._acl:
if rule is None:
continue
if (
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
or (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
or (rule.protocol == protocol or rule.protocol is None)
or (rule.src_port == src_port or rule.src_port is None)
or (rule.dst_port == dst_port or rule.dst_port is None)
):
relevant_rules.append(rule)
return relevant_rules
def show(self, markdown: bool = False):
"""
Display the current ACL rules as a table.
:param markdown: Whether to display the table in Markdown format. Defaults to False.
"""
table = PrettyTable(["Index", "Action", "Protocol", "Src IP", "Src Port", "Dst IP", "Dst Port"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} Access Control List"
for index, rule in enumerate(self.acl + [self.implicit_rule]):
if rule:
table.add_row(
[
index,
rule.action.name if rule.action else "ANY",
rule.protocol.name if rule.protocol else "ANY",
rule.src_ip_address if rule.src_ip_address else "ANY",
f"{rule.src_port.value} ({rule.src_port.name})" if rule.src_port else "ANY",
rule.dst_ip_address if rule.dst_ip_address else "ANY",
f"{rule.dst_port.value} ({rule.dst_port.name})" if rule.dst_port else "ANY",
]
)
print(table)
class RouteEntry(SimComponent):
"""
Represents a single entry in a routing table.
Attributes:
address (IPv4Address): The destination IP address or network address.
subnet_mask (IPv4Address): The subnet mask for the network.
next_hop_ip_address (IPv4Address): The next hop IP address to which packets should be forwarded.
metric (int): The cost metric for this route. Default is 0.0.
Example:
>>> entry = RouteEntry(
... IPv4Address("192.168.1.0"),
... IPv4Address("255.255.255.0"),
... IPv4Address("192.168.2.1"),
... metric=5
... )
"""
address: IPv4Address
"The destination IP address or network address."
subnet_mask: IPv4Address
"The subnet mask for the network."
next_hop_ip_address: IPv4Address
"The next hop IP address to which packets should be forwarded."
metric: float = 0.0
"The cost metric for this route. Default is 0.0."
def __init__(self, **kwargs):
for key in {"address", "subnet_mask", "next_hop_ip_address"}:
if not isinstance(kwargs[key], IPv4Address):
kwargs[key] = IPv4Address(kwargs[key])
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Describes the current state of the RouteEntry.
:return: A dictionary representing the current state.
"""
pass
class RouteTable(SimComponent):
"""
Represents a routing table holding multiple route entries.
:ivar List[RouteEntry] routes: A list of RouteEntry objects.
Example:
>>> rt = RouteTable()
>>> rt.add_route(
... RouteEntry(
... IPv4Address("192.168.1.0"),
... IPv4Address("255.255.255.0"),
... IPv4Address("192.168.2.1"),
... metric=5
... )
... )
>>> best_route = rt.find_best_route(IPv4Address("192.168.1.34"))
"""
routes: List[RouteEntry] = []
sys_log: SysLog
def describe_state(self) -> Dict:
"""
Describes the current state of the RouteTable.
:return: A dictionary representing the current state.
"""
pass
def add_route(
self,
address: Union[IPv4Address, str],
subnet_mask: Union[IPv4Address, str],
next_hop_ip_address: Union[IPv4Address, str],
metric: float = 0.0,
):
"""
Add a route to the routing table.
:param address: The destination address of the route.
:param subnet_mask: The subnet mask of the route.
:param next_hop_ip_address: The next hop IP for the route.
:param metric: The metric of the route, default is 0.0.
"""
for key in {address, subnet_mask, next_hop_ip_address}:
if not isinstance(key, IPv4Address):
key = IPv4Address(key)
route = RouteEntry(
address=address, subnet_mask=subnet_mask, next_hop_ip_address=next_hop_ip_address, metric=metric
)
self.routes.append(route)
def find_best_route(self, destination_ip: Union[str, IPv4Address]) -> Optional[RouteEntry]:
"""
Find the best route for a given destination IP.
This method uses the Longest Prefix Match algorithm and considers metrics to find the best route.
:param destination_ip: The destination IP to find the route for.
:return: The best matching RouteEntry, or None if no route matches.
"""
if not isinstance(destination_ip, IPv4Address):
destination_ip = IPv4Address(destination_ip)
best_route = None
longest_prefix = -1
lowest_metric = float("inf") # Initialise at infinity as any other number we compare to it will be smaller
for route in self.routes:
route_network = IPv4Network(f"{route.address}/{route.subnet_mask}", strict=False)
prefix_len = route_network.prefixlen
if destination_ip in route_network:
if prefix_len > longest_prefix or (prefix_len == longest_prefix and route.metric < lowest_metric):
best_route = route
longest_prefix = prefix_len
lowest_metric = route.metric
return best_route
def show(self, markdown: bool = False):
"""
Display the current routing table as a table.
:param markdown: Whether to display the table in Markdown format. Defaults to False.
"""
table = PrettyTable(["Index", "Address", "Next Hop", "Metric"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} Route Table"
for index, route in enumerate(self.routes):
network = IPv4Network(f"{route.address}/{route.subnet_mask}")
table.add_row([index, f"{route.address}/{network.prefixlen}", route.next_hop_ip_address, route.metric])
print(table)
class RouterARPCache(ARPCache):
"""
Inherits from ARPCache and adds router-specific ARP packet processing.
:ivar SysLog sys_log: A system log for logging messages.
:ivar Router router: The router to which this ARP cache belongs.
"""
def __init__(self, sys_log: SysLog, router: Router):
super().__init__(sys_log)
self.router: Router = router
def process_arp_packet(self, from_nic: NIC, frame: Frame):
"""
Overridden method to process a received ARP packet in a router-specific way.
:param from_nic: The NIC that received the ARP packet.
:param frame: The original ARP frame.
"""
arp_packet = frame.arp
# ARP Reply
if not arp_packet.request:
for nic in self.router.nics.values():
if arp_packet.target_ip_address == nic.ip_address:
# reply to the Router specifically
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip_address} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address,
mac_address=arp_packet.sender_mac_addr,
nic=from_nic,
)
return
# Reply for a connected requested
nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
if nic:
self.sys_log.info(
f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
)
arp_packet.sender_mac_addr = nic.mac_address
frame.decrement_ttl()
nic.send_frame(frame)
# ARP Request
self.sys_log.info(
f"Received ARP request for {arp_packet.target_ip_address} from "
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
)
# Matched ARP request
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
)
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
# If the target IP matches one of the router's NICs
for nic in self.nics.values():
if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
arp_reply = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_reply, from_nic)
return
class RouterICMP(ICMP):
"""
A class to represent a router's Internet Control Message Protocol (ICMP) handler.
:param sys_log: System log for logging network events and errors.
:type sys_log: SysLog
:param arp_cache: The ARP cache for resolving MAC addresses.
:type arp_cache: ARPCache
:param router: The router to which this ICMP handler belongs.
:type router: Router
"""
router: Router
def __init__(self, sys_log: SysLog, arp_cache: ARPCache, router: Router):
super().__init__(sys_log, arp_cache)
self.router = router
def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False):
"""
Process incoming ICMP frames based on ICMP type.
:param frame: The incoming frame to process.
:param from_nic: The network interface where the frame is coming from.
:param is_reattempt: Flag to indicate if the process is a reattempt.
"""
if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
# determine if request is for router interface or whether it needs to be routed
for nic in self.router.nics.values():
if nic.ip_address == frame.ip.dst_ip_address:
if nic.enabled:
# reply to the request
if not is_reattempt:
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address)
src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address)
tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
# Network Layer
ip_packet = IPPacket(
src_ip_address=nic.ip_address,
dst_ip_address=frame.ip.src_ip_address,
protocol=IPProtocol.ICMP,
)
# Data Link Layer
ethernet_header = EthernetHeader(
src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address
)
icmp_reply_packet = ICMPPacket(
icmp_type=ICMPType.ECHO_REPLY,
icmp_code=0,
identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1,
)
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(
ethernet=ethernet_header,
ip=ip_packet,
tcp=tcp_header,
icmp=icmp_reply_packet,
payload=payload,
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip_address}")
src_nic.send_frame(frame)
return
# Route the frame
self.router.route_frame(frame, from_nic)
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
for nic in self.router.nics.values():
if nic.ip_address == frame.ip.dst_ip_address:
if nic.enabled:
time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
f"Reply from {frame.ip.src_ip_address}: "
f"bytes={len(frame.payload)}, "
f"time={time_str}, "
f"TTL={frame.ip.ttl}"
)
if not self.request_replies.get(frame.icmp.identifier):
self.request_replies[frame.icmp.identifier] = 0
self.request_replies[frame.icmp.identifier] += 1
return
# Route the frame
self.router.route_frame(frame, from_nic)
class Router(Node):
"""
A class to represent a network router node.
:ivar str hostname: The name of the router node.
:ivar int num_ports: The number of ports in the router.
:ivar dict kwargs: Optional keyword arguments for SysLog, ACL, RouteTable, RouterARPCache, RouterICMP.
"""
num_ports: int
ethernet_ports: Dict[int, NIC] = {}
acl: AccessControlList
route_table: RouteTable
arp: RouterARPCache
icmp: RouterICMP
def __init__(self, hostname: str, num_ports: int = 5, **kwargs):
if not kwargs.get("sys_log"):
kwargs["sys_log"] = SysLog(hostname)
if not kwargs.get("acl"):
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY)
if not kwargs.get("route_table"):
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
if not kwargs.get("arp"):
kwargs["arp"] = RouterARPCache(sys_log=kwargs.get("sys_log"), router=self)
if not kwargs.get("icmp"):
kwargs["icmp"] = RouterICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"), router=self)
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
for i in range(1, self.num_ports + 1):
nic = NIC(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0")
self.connect_nic(nic)
self.ethernet_ports[i] = nic
self.arp.nics = self.nics
self.icmp.arp = self.arp
def _get_port_of_nic(self, target_nic: NIC) -> Optional[int]:
"""
Retrieve the port number for a given NIC.
:param target_nic: Target network interface.
:return: The port number if NIC is found, otherwise None.
"""
for port, nic in self.ethernet_ports.items():
if nic == target_nic:
return port
def describe_state(self) -> Dict:
"""
Describes the current state of the Router.
:return: A dictionary representing the current state.
"""
pass
def route_frame(self, frame: Frame, from_nic: NIC, re_attempt: bool = False) -> None:
"""
Route a given frame from a source NIC to its destination.
:param frame: The frame to be routed.
:param from_nic: The source network interface.
:param re_attempt: Flag to indicate if the routing is a reattempt.
"""
# Check if src ip is on network of one of the NICs
nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip_address)
target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip_address)
if re_attempt and not nic:
self.sys_log.info(f"Destination {frame.ip.dst_ip_address} is unreachable")
return
if not nic:
self.arp.send_arp_request(frame.ip.dst_ip_address)
return self.route_frame(frame=frame, from_nic=from_nic, re_attempt=True)
if not nic.enabled:
# TODO: Add sys_log here
return
if frame.ip.dst_ip_address in nic.ip_network:
from_port = self._get_port_of_nic(from_nic)
to_port = self._get_port_of_nic(nic)
self.sys_log.info(f"Routing frame to internally from port {from_port} to port {to_port}")
frame.decrement_ttl()
frame.ethernet.src_mac_addr = nic.mac_address
frame.ethernet.dst_mac_addr = target_mac
nic.send_frame(frame)
return
else:
pass
# TODO: Deal with routing from route tables
def receive_frame(self, frame: Frame, from_nic: NIC):
"""
Receive a frame from a NIC and processes it based on its protocol.
:param frame: The incoming frame.
:param from_nic: The network interface where the frame is coming from.
"""
route_frame = False
protocol = frame.ip.protocol
src_ip_address = frame.ip.src_ip_address
dst_ip_address = frame.ip.dst_ip_address
src_port = None
dst_port = None
if frame.ip.protocol == IPProtocol.TCP:
src_port = frame.tcp.src_port
dst_port = frame.tcp.dst_port
elif frame.ip.protocol == IPProtocol.UDP:
src_port = frame.udp.src_port
dst_port = frame.udp.dst_port
# Check if it's permitted
permitted, rule = self.acl.is_permitted(
protocol=protocol,
src_ip_address=src_ip_address,
src_port=src_port,
dst_ip_address=dst_ip_address,
dst_port=dst_port,
)
if not permitted:
at_port = self._get_port_of_nic(from_nic)
self.sys_log.info(f"Frame blocked at port {at_port} by rule {rule}")
return
if not self.arp.get_arp_cache_nic(src_ip_address):
self.arp.add_arp_cache_entry(src_ip_address, frame.ethernet.src_mac_addr, from_nic)
if frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
else:
if src_port == Port.ARP:
self.arp.process_arp_packet(from_nic=from_nic, frame=frame)
else:
# All other traffic
route_frame = True
if route_frame:
self.route_frame(frame, from_nic)
def configure_port(self, port: int, ip_address: Union[IPv4Address, str], subnet_mask: Union[IPv4Address, str]):
"""
Configure the IP settings of a given port.
:param port: The port to configure.
:param ip_address: The IP address to set.
:param subnet_mask: The subnet mask to set.
"""
if not isinstance(ip_address, IPv4Address):
ip_address = IPv4Address(ip_address)
if not isinstance(subnet_mask, IPv4Address):
subnet_mask = IPv4Address(subnet_mask)
nic = self.ethernet_ports[port]
nic.ip_address = ip_address
nic.subnet_mask = subnet_mask
self.sys_log.info(f"Configured port {port} with ip_address={ip_address}/{nic.ip_network.prefixlen}")
def enable_port(self, port: int):
"""
Enable a given port on the router.
:param port: The port to enable.
"""
nic = self.ethernet_ports.get(port)
if nic:
nic.enable()
def disable_port(self, port: int):
"""
Disable a given port on the router.
:param port: The port to disable.
"""
nic = self.ethernet_ports.get(port)
if nic:
nic.disable()
def show(self, markdown: bool = False):
"""
Prints the state of the Ethernet interfaces on the Router.
:param markdown: Flag to indicate if the output should be in markdown format.
"""
"""Prints a table of the NICs on the Node."""
table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Ethernet Interfaces"
for port, nic in self.ethernet_ports.items():
table.add_row(
[
port,
nic.mac_address,
f"{nic.ip_address}/{nic.ip_network.prefixlen}",
nic.speed,
"Enabled" if nic.enabled else "Disabled",
]
)
print(table)

View File

@@ -0,0 +1,34 @@
from primaite.simulator.network.hardware.nodes.computer import Computer
class Server(Computer):
"""
A basic Server class.
Example:
>>> server_a = Server(
hostname="server_a",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
)
>>> server_a.power_on()
Instances of Server come 'pre-packaged' with the following:
* Core Functionality:
* ARP
* ICMP
* Packet Capture
* Sys Log
* Services:
* DNS Client
* FTP Client
* LDAP Client
* NTP Client
* Applications:
* Email Client
* Web Browser
* Processes:
* Placeholder
"""

View File

@@ -0,0 +1,121 @@
from typing import Dict
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
from primaite.links.link import Link
from primaite.simulator.network.hardware.base import Node, SwitchPort
from primaite.simulator.network.transmission.data_link_layer import Frame
_LOGGER = getLogger(__name__)
class Switch(Node):
"""
A class representing a Layer 2 network switch.
:ivar num_ports: The number of ports on the switch. Default is 24.
"""
num_ports: int = 24
"The number of ports on the switch."
switch_ports: Dict[int, SwitchPort] = {}
"The SwitchPorts on the switch."
mac_address_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.parent = self
port.port_num = port_num
def show(self, markdown: bool = False):
"""
Prints a table of the SwitchPorts on the Switch.
:param markdown: If True, outputs the table in markdown format. Default is False.
"""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Switch Ports"
for port_num, port in self.switch_ports.items():
table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"])
print(table)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
return {
"uuid": self.uuid,
"num_ports": self.num_ports, # redundant?
"ports": {port_num: port.describe_state() for port_num, port in self.switch_ports.items()},
"mac_address_table": {mac: port for mac, port in self.mac_address_table.items()},
}
def _add_mac_table_entry(self, mac_address: str, switch_port: SwitchPort):
"""
Private method to add an entry to the MAC address table.
:param mac_address: MAC address to be added.
:param switch_port: Corresponding SwitchPort object.
"""
mac_table_port = self.mac_address_table.get(mac_address)
if not mac_table_port:
self.mac_address_table[mac_address] = switch_port
self.sys_log.info(f"Added MAC table entry: Port {switch_port.port_num} -> {mac_address}")
else:
if mac_table_port != switch_port:
self.mac_address_table.pop(mac_address)
self.sys_log.info(f"Removed MAC table entry: Port {mac_table_port.port_num} -> {mac_address}")
self._add_mac_table_entry(mac_address, switch_port)
def forward_frame(self, frame: Frame, incoming_port: SwitchPort):
"""
Forward a frame to the appropriate port based on the destination MAC address.
:param frame: The Frame to be forwarded.
:param incoming_port: The port number from which the frame was received.
"""
src_mac = frame.ethernet.src_mac_addr
dst_mac = frame.ethernet.dst_mac_addr
self._add_mac_table_entry(src_mac, incoming_port)
outgoing_port = self.mac_address_table.get(dst_mac)
if outgoing_port or dst_mac != "ff:ff:ff:ff:ff:ff":
outgoing_port.send_frame(frame)
else:
# If the destination MAC is not in the table, flood to all ports except incoming
for port in self.switch_ports.values():
if port != incoming_port:
port.send_frame(frame)
def disconnect_link_from_port(self, link: Link, port_number: int):
"""
Disconnect a given link from the specified port number on the switch.
:param link: The Link object to be disconnected.
:param port_number: The port number on the switch from where the link should be disconnected.
:raise NetworkError: When an invalid port number is provided or the link does not match the connection.
"""
port = self.switch_ports.get(port_number)
if port is None:
msg = f"Invalid port number {port_number} on the switch"
_LOGGER.error(msg)
raise NetworkError(msg)
if port.connected_link != link:
msg = f"The link does not match the connection at port number {port_number}"
_LOGGER.error(msg)
raise NetworkError(msg)
port.disconnect_link()

View File

@@ -0,0 +1,186 @@
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import NIC
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
def client_server_routed() -> Network:
"""
A basic Client/Server Network routed between subnets.
+------------+ +------------+ +------------+ +------------+ +------------+
| | | | | | | | | |
| client_1 +------+ switch_2 +------+ router_1 +------+ switch_1 +------+ server_1 |
| | | | | | | | | |
+------------+ +------------+ +------------+ +------------+ +------------+
IP Table:
"""
network = Network()
# Router 1
router_1 = Router(hostname="router_1", num_ports=3)
router_1.power_on()
router_1.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
router_1.configure_port(port=2, ip_address="192.168.2.1", subnet_mask="255.255.255.0")
# Switch 1
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6])
router_1.enable_port(1)
# Switch 2
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2.power_on()
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6])
router_1.enable_port(2)
# Client 1
client_1 = Computer(
hostname="client_1", ip_address="192.168.2.2", subnet_mask="255.255.255.0", default_gateway="192.168.2.1"
)
client_1.power_on()
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
# Server 1
server_1 = Server(
hostname="server_1", ip_address="192.168.1.2", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
)
server_1.power_on()
network.connect(endpoint_b=server_1.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
return network
def arcd_uc2_network() -> Network:
"""
Models the ARCD Use Case 2 Network.
+------------+
| domain_ |
+------------+ controller |
| | |
| +------------+
|
|
+------------+ | +------------+
| | | | |
| client_1 +---------+ | +---------+ web_server |
| | | | | | |
+------------+ | | | +------------+
+--+---------+ +------------+ +------+--+--+
| | | | | |
| switch_2 +------+ router_1 +------+ switch_1 |
| | | | | |
+--+------+--+ +------------+ +--+---+--+--+
+------------+ | | | | | +------------+
| | | | | | | | database |
| client_2 +---------+ | | | +---------+ _server |
| | | | | | |
+------------+ | | | +------------+
| +------------+ | |
| | security | | |
+---------+ _suite +---------+ | +------------+
| | | | backup_ |
+------------+ +------------+ server |
| |
+------------+
"""
network = Network()
# Router 1
router_1 = Router(hostname="router_1", num_ports=5)
router_1.power_on()
router_1.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
router_1.configure_port(port=2, ip_address="192.168.10.1", subnet_mask="255.255.255.0")
# Switch 1
switch_1 = Switch(hostname="switch_1", num_ports=8)
switch_1.power_on()
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[8])
router_1.enable_port(1)
# Switch 2
switch_2 = Switch(hostname="switch_2", num_ports=8)
switch_2.power_on()
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[8])
router_1.enable_port(2)
# Client 1
client_1 = Computer(
hostname="client_1", ip_address="192.168.10.21", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
)
client_1.power_on()
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
# Client 2
client_2 = Computer(
hostname="client_2", ip_address="192.168.10.22", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
)
client_2.power_on()
network.connect(endpoint_b=client_2.ethernet_port[1], endpoint_a=switch_2.switch_ports[2])
# Domain Controller
domain_controller = Server(
hostname="domain_controller",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
)
domain_controller.power_on()
network.connect(endpoint_b=domain_controller.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
# Web Server
web_server = Server(
hostname="web_server", ip_address="192.168.1.12", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
)
web_server.power_on()
network.connect(endpoint_b=web_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[2])
# Database Server
database_server = Server(
hostname="database_server",
ip_address="192.168.1.14",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
)
database_server.power_on()
network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3])
# Backup Server
backup_server = Server(
hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
)
backup_server.power_on()
network.connect(endpoint_b=backup_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[4])
# Security Suite
security_suite = Server(
hostname="security_suite",
ip_address="192.168.1.110",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
)
security_suite.power_on()
network.connect(endpoint_b=security_suite.ethernet_port[1], endpoint_a=switch_1.switch_ports[7])
security_suite.connect_nic(NIC(ip_address="192.168.10.110", subnet_mask="255.255.255.0"))
network.connect(endpoint_b=security_suite.ethernet_port[2], endpoint_a=switch_2.switch_ports[7])
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
return network

View File

@@ -24,21 +24,21 @@ class ARPPacket(BaseModel):
:param request: ARP operation. True if a request, False if a reply.
:param sender_mac_addr: Sender MAC address.
:param sender_ip: Sender IP address.
:param sender_ip_address: Sender IP address.
:param target_mac_addr: Target MAC address.
:param target_ip: Target IP address.
:param target_ip_address: Target IP address.
:Example:
>>> arp_request = ARPPacket(
... sender_mac_addr="aa:bb:cc:dd:ee:ff",
... sender_ip=IPv4Address("192.168.0.1"),
... target_ip=IPv4Address("192.168.0.2")
... sender_ip_address=IPv4Address("192.168.0.1"),
... target_ip_address=IPv4Address("192.168.0.2")
... )
>>> arp_response = ARPPacket(
... sender_mac_addr="aa:bb:cc:dd:ee:ff",
... sender_ip=IPv4Address("192.168.0.1"),
... target_ip=IPv4Address("192.168.0.2")
... sender_ip_address=IPv4Address("192.168.0.1"),
... target_ip_address=IPv4Address("192.168.0.2")
... )
"""
@@ -46,11 +46,11 @@ class ARPPacket(BaseModel):
"ARP operation. True if a request, False if a reply."
sender_mac_addr: str
"Sender MAC address."
sender_ip: IPv4Address
sender_ip_address: IPv4Address
"Sender IP address."
target_mac_addr: Optional[str] = None
"Target MAC address."
target_ip: IPv4Address
target_ip_address: IPv4Address
"Target IP address."
def generate_reply(self, mac_address: str) -> ARPPacket:
@@ -62,8 +62,8 @@ class ARPPacket(BaseModel):
"""
return ARPPacket(
request=False,
sender_ip=self.target_ip,
sender_ip_address=self.target_ip_address,
sender_mac_addr=mac_address,
target_ip=self.sender_ip,
target_ip_address=self.sender_ip_address,
target_mac_addr=self.sender_mac_addr,
)

View File

@@ -52,8 +52,8 @@ class Frame(BaseModel):
... dst_mac_addr='11:22:33:44:55:66'
... ),
... ip=IPPacket(
... src_ip=IPv4Address('192.168.0.1'),
... dst_ip=IPv4Address('10.0.0.1'),
... src_ip_address=IPv4Address('192.168.0.1'),
... dst_ip_address=IPv4Address('10.0.0.1'),
... ),
... tcp=TCPHeader(
... src_port=8080,
@@ -124,6 +124,11 @@ class Frame(BaseModel):
if not self.received_timestamp:
self.received_timestamp = datetime.now()
def transmission_duration(self) -> int:
"""The transmission duration in milliseconds."""
delta = self.received_timestamp - self.sent_timestamp
return int(delta.microseconds / 1000)
@property
def size(self) -> float: # noqa - Keep it as MBits as this is how they're expressed
"""The size of the Frame in Bytes."""

View File

@@ -162,8 +162,8 @@ class IPPacket(BaseModel):
"""
Represents the IP layer of a network frame.
:param src_ip: Source IP address.
:param dst_ip: Destination IP address.
:param src_ip_address: Source IP address.
:param dst_ip_address: Destination IP address.
:param protocol: The IP protocol (default is TCP).
:param ttl: Time to Live (TTL) for the packet.
:param precedence: Precedence level for Quality of Service (QoS).
@@ -172,17 +172,17 @@ class IPPacket(BaseModel):
>>> from ipaddress import IPv4Address
>>> ip_packet = IPPacket(
... src_ip=IPv4Address('192.168.0.1'),
... dst_ip=IPv4Address('10.0.0.1'),
... src_ip_address=IPv4Address('192.168.0.1'),
... dst_ip_address=IPv4Address('10.0.0.1'),
... protocol=IPProtocol.TCP,
... ttl=64,
... precedence=Precedence.CRITICAL
... )
"""
src_ip: IPv4Address
src_ip_address: IPv4Address
"Source IP address."
dst_ip: IPv4Address
dst_ip_address: IPv4Address
"Destination IP address."
protocol: IPProtocol = IPProtocol.TCP
"IPProtocol."
@@ -192,8 +192,8 @@ class IPPacket(BaseModel):
"Precedence level for Quality of Service (default is Precedence.ROUTINE)."
def __init__(self, **kwargs):
if not isinstance(kwargs["src_ip"], IPv4Address):
kwargs["src_ip"] = IPv4Address(kwargs["src_ip"])
if not isinstance(kwargs["dst_ip"], IPv4Address):
kwargs["dst_ip"] = IPv4Address(kwargs["dst_ip"])
if not isinstance(kwargs["src_ip_address"], IPv4Address):
kwargs["src_ip_address"] = IPv4Address(kwargs["src_ip_address"])
if not isinstance(kwargs["dst_ip_address"], IPv4Address):
kwargs["dst_ip_address"] = IPv4Address(kwargs["dst_ip_address"])
super().__init__(**kwargs)

View File

@@ -22,16 +22,16 @@ class Session(SimComponent):
source and destination IPs and ports.
:param protocol: The IP protocol used in the session.
:param src_ip: The source IP address.
:param dst_ip: The destination IP address.
:param src_ip_address: The source IP address.
:param dst_ip_address: The destination IP address.
:param src_port: The source port number (optional).
:param dst_port: The destination port number (optional).
:param connected: A flag indicating whether the session is connected.
"""
protocol: IPProtocol
src_ip: IPv4Address
dst_ip: IPv4Address
src_ip_address: IPv4Address
dst_ip_address: IPv4Address
src_port: Optional[Port]
dst_port: Optional[Port]
connected: bool = False
@@ -46,8 +46,14 @@ class Session(SimComponent):
:param session_key: Tuple containing the session details.
:return: A Session instance.
"""
protocol, src_ip, dst_ip, src_port, dst_port = session_key
return Session(protocol=protocol, src_ip=src_ip, dst_ip=dst_ip, src_port=src_port, dst_port=dst_port)
protocol, src_ip_address, dst_ip_address, src_port, dst_port = session_key
return Session(
protocol=protocol,
src_ip_address=src_ip_address,
dst_ip_address=dst_ip_address,
src_port=src_port,
dst_port=dst_port,
)
def describe_state(self) -> Dict:
"""
@@ -108,8 +114,8 @@ class SessionManager:
:return: A tuple containing the session key.
"""
protocol = frame.ip.protocol
src_ip = frame.ip.src_ip
dst_ip = frame.ip.dst_ip
src_ip_address = frame.ip.src_ip_address
dst_ip_address = frame.ip.dst_ip_address
if protocol == IPProtocol.TCP:
if from_source:
src_port = frame.tcp.src_port
@@ -127,7 +133,7 @@ class SessionManager:
else:
src_port = None
dst_port = None
return protocol, src_ip, dst_ip, src_port, dst_port
return protocol, src_ip_address, dst_ip_address, src_port, dst_port
def receive_payload_from_software_manager(self, payload: Any, session_id: Optional[int] = None):
"""

View File

@@ -1,6 +1,8 @@
import logging
from pathlib import Path
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator import TEMP_SIM_OUTPUT
@@ -43,7 +45,7 @@ class SysLog:
file_handler = logging.FileHandler(filename=log_path)
file_handler.setLevel(logging.DEBUG)
log_format = "%(asctime)s %(levelname)s: %(message)s"
log_format = "%(asctime)s::%(levelname)s::%(message)s"
file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"{self.hostname}_sys_log")
@@ -52,6 +54,27 @@ class SysLog:
self.logger.addFilter(_NotJSONFilter())
def show(self, last_n: int = 10, markdown: bool = False):
"""
Print the Node Sys Log as a table.
Generate and print PrettyTable instance that shows the Nodes Sys Log, with columns Timestamp, Level,
and Massage.
:param markdown: Use Markdown style in table output. Defaults to False.
"""
table = PrettyTable(["Timestamp", "Level", "Message"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Sys Log"
if self._get_log_path().exists():
with open(self._get_log_path()) as file:
lines = file.readlines()
for line in lines[-last_n:]:
table.add_row(line.strip().split("::"))
print(table)
def _get_log_path(self) -> Path:
"""
Constructs the path for the log file based on the hostname.

View File

@@ -1,16 +1,15 @@
from primaite.simulator.network.hardware.base import Link, NIC, Node, Switch
from primaite.simulator.network.hardware.base import Link, NIC, Node
def test_node_to_node_ping():
"""Tests two Nodes are able to ping each other."""
# TODO Add actual checks. Manual check performed for now.
node_a = Node(hostname="node_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
node_a.connect_nic(nic_a)
node_a.power_on()
node_b = Node(hostname="node_b")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0")
node_b.connect_nic(nic_b)
node_b.power_on()
@@ -21,21 +20,20 @@ def test_node_to_node_ping():
def test_multi_nic():
"""Tests that Nodes with multiple NICs can ping each other and the data go across the correct links."""
# TODO Add actual checks. Manual check performed for now.
node_a = Node(hostname="node_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
node_a.connect_nic(nic_a)
node_a.power_on()
node_b = Node(hostname="node_b")
nic_b1 = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_b2 = NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0", gateway="10.0.0.1")
nic_b1 = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0")
nic_b2 = NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0")
node_b.connect_nic(nic_b1)
node_b.connect_nic(nic_b2)
node_b.power_on()
node_c = Node(hostname="node_c")
nic_c = NIC(ip_address="10.0.0.13", subnet_mask="255.0.0.0", gateway="10.0.0.1")
nic_c = NIC(ip_address="10.0.0.13", subnet_mask="255.0.0.0")
node_c.connect_nic(nic_c)
node_c.power_on()
@@ -45,42 +43,4 @@ def test_multi_nic():
node_a.ping("192.168.0.11")
node_c.ping("10.0.0.12")
def test_switched_network():
"""Tests a larges network of Nodes and Switches with one node pinging another."""
# TODO Add actual checks. Manual check performed for now.
pc_a = Node(hostname="pc_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_a.connect_nic(nic_a)
pc_a.power_on()
pc_b = Node(hostname="pc_b")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_b.connect_nic(nic_b)
pc_b.power_on()
pc_c = Node(hostname="pc_c")
nic_c = NIC(ip_address="192.168.0.12", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_c.connect_nic(nic_c)
pc_c.power_on()
pc_d = Node(hostname="pc_d")
nic_d = NIC(ip_address="192.168.0.13", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_d.connect_nic(nic_d)
pc_d.power_on()
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2.power_on()
link_nic_a_switch_1 = Link(endpoint_a=nic_a, endpoint_b=switch_1.switch_ports[1])
link_nic_b_switch_1 = Link(endpoint_a=nic_b, endpoint_b=switch_1.switch_ports[2])
link_nic_c_switch_2 = Link(endpoint_a=nic_c, endpoint_b=switch_2.switch_ports[1])
link_nic_d_switch_2 = Link(endpoint_a=nic_d, endpoint_b=switch_2.switch_ports[2])
link_switch_1_switch_2 = Link(endpoint_a=switch_1.switch_ports[6], endpoint_b=switch_2.switch_ports[6])
pc_a.ping("192.168.0.13")
assert node_c.ping("10.0.0.12")

View File

@@ -4,18 +4,17 @@ from primaite.simulator.network.hardware.base import Link, NIC, Node
def test_link_up():
"""Tests Nodes, NICs, and Links can all be connected and be in an enabled/up state."""
node_a = Node(hostname="node_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
node_a.connect_nic(nic_a)
node_a.power_on()
assert nic_a.enabled
node_b = Node(hostname="node_b")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0")
node_b.connect_nic(nic_b)
node_b.power_on()
assert nic_b.enabled
link = Link(endpoint_a=nic_a, endpoint_b=nic_b)
assert nic_a.enabled
assert nic_b.enabled
assert link.is_up

View File

@@ -6,9 +6,5 @@ from primaite.simulator.network.hardware.base import Link, NIC
def test_link_fails_with_same_nic():
"""Tests Link creation fails with endpoint_a and endpoint_b are the same NIC."""
with pytest.raises(ValueError):
nic_a = NIC(
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
nic_a = NIC(ip_address="192.168.1.2", subnet_mask="255.255.255.0")
Link(endpoint_a=nic_a, endpoint_b=nic_a)

View File

@@ -0,0 +1,55 @@
from typing import Tuple
import pytest
from primaite.simulator.network.hardware.base import Link, NIC, Node
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@pytest.fixture(scope="function")
def pc_a_pc_b_router_1() -> Tuple[Node, Node, Router]:
pc_a = Node(hostname="pc_a", default_gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
pc_a.connect_nic(nic_a)
pc_a.power_on()
pc_b = Node(hostname="pc_b", default_gateway="192.168.1.1")
nic_b = NIC(ip_address="192.168.1.10", subnet_mask="255.255.255.0")
pc_b.connect_nic(nic_b)
pc_b.power_on()
router_1 = Router(hostname="router_1")
router_1.power_on()
router_1.configure_port(1, "192.168.0.1", "255.255.255.0")
router_1.configure_port(2, "192.168.1.1", "255.255.255.0")
Link(endpoint_a=nic_a, endpoint_b=router_1.ethernet_ports[1])
Link(endpoint_a=nic_b, endpoint_b=router_1.ethernet_ports[2])
router_1.enable_port(1)
router_1.enable_port(2)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
return pc_a, pc_b, router_1
def test_ping_default_gateway(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping(pc_a.default_gateway)
def test_ping_other_router_port(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping(pc_b.default_gateway)
def test_host_on_other_subnet(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping("192.168.1.10")

View File

@@ -0,0 +1,25 @@
from primaite.simulator.network.hardware.base import Link
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
def test_switched_network():
"""Tests a node can ping another node via the switch."""
client_1 = Computer(
hostname="client_1", ip_address="192.168.1.10", subnet_mask="255.255.255.0", default_gateway="192.168.1.0"
)
client_1.power_on()
server_1 = Server(
hostname=" server_1", ip_address="192.168.1.11", subnet_mask="255.255.255.0", default_gateway="192.168.1.11"
)
server_1.power_on()
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
Link(endpoint_a=client_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[1])
Link(endpoint_a=server_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[2])
assert client_1.ping("192.168.1.11")

View File

@@ -0,0 +1,111 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
def test_add_rule():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
assert acl.acl[1].action == ACLAction.PERMIT
assert acl.acl[1].protocol == IPProtocol.TCP
assert acl.acl[1].src_ip_address == IPv4Address("192.168.1.1")
assert acl.acl[1].src_port == Port(8080)
assert acl.acl[1].dst_ip_address == IPv4Address("192.168.1.2")
assert acl.acl[1].dst_port == Port(80)
def test_remove_rule():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
acl.remove_rule(1)
assert not acl.acl[1]
def test_rules():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
acl.add_rule(
action=ACLAction.DENY,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.3"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.4"),
dst_port=Port(80),
position=2,
)
is_permitted, rule = acl.is_permitted(
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
)
assert is_permitted
is_permitted, rule = acl.is_permitted(
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.3"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.4"),
dst_port=Port(80),
)
assert not is_permitted
def test_default_rule():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
acl.add_rule(
action=ACLAction.DENY,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.3"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.4"),
dst_port=Port(80),
position=2,
)
is_permitted, rule = acl.is_permitted(
protocol=IPProtocol.UDP,
src_ip_address=IPv4Address("192.168.1.5"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.12"),
dst_port=Port(80),
)
assert not is_permitted

View File

@@ -32,10 +32,8 @@ def test_nic_ip_address_type_conversion():
nic = NIC(
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
assert isinstance(nic.ip_address, IPv4Address)
assert isinstance(nic.gateway, IPv4Address)
def test_nic_deserialize():
@@ -43,7 +41,6 @@ def test_nic_deserialize():
nic = NIC(
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
nic_json = nic.model_dump_json()
@@ -51,21 +48,10 @@ def test_nic_deserialize():
assert nic_json == deserialized_nic.model_dump_json()
def test_nic_ip_address_as_gateway_fails():
"""Tests NIC creation fails if ip address is the same as the gateway."""
with pytest.raises(ValueError):
NIC(
ip_address="192.168.0.1",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
def test_nic_ip_address_as_network_address_fails():
"""Tests NIC creation fails if ip address and subnet mask are a network address."""
with pytest.raises(ValueError):
NIC(
ip_address="192.168.0.0",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)

View File

@@ -10,7 +10,7 @@ def test_frame_minimal_instantiation():
"""Tests that the minimum frame (TCP SYN) using default values."""
frame = Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20"),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20"),
tcp=TCPHeader(
src_port=8080,
dst_port=80,
@@ -38,7 +38,7 @@ def test_frame_creation_fails_tcp_without_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.TCP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.TCP),
)
@@ -47,7 +47,7 @@ def test_frame_creation_fails_udp_without_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.UDP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.UDP),
)
@@ -56,7 +56,7 @@ def test_frame_creation_fails_tcp_with_udp_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.TCP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.TCP),
udp=UDPHeader(src_port=8080, dst_port=80),
)
@@ -66,7 +66,7 @@ def test_frame_creation_fails_udp_with_tcp_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.UDP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.UDP),
udp=TCPHeader(src_port=8080, dst_port=80),
)
@@ -75,7 +75,7 @@ def test_icmp_frame_creation():
"""Tests Frame creation for ICMP."""
frame = Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.ICMP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.ICMP),
icmp=ICMPPacket(),
)
assert frame
@@ -86,5 +86,5 @@ def test_icmp_frame_creation_fails_without_icmp_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.ICMP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.ICMP),
)