#1800 - Added more docstrings and rst docs.

- Extended the .show functionality to enable markdown format too.
This commit is contained in:
Chris McCarthy
2023-09-01 16:58:21 +01:00
parent 89ad22aceb
commit 5111affeeb
12 changed files with 753 additions and 108 deletions

View File

@@ -18,3 +18,6 @@ Contents
simulation_structure simulation_structure
simulation_components/network/base_hardware simulation_components/network/base_hardware
simulation_components/network/transport_to_data_link_layer simulation_components/network/transport_to_data_link_layer
simulation_components/network/router
simulation_components/network/switch
simulation_components/network/network

View File

@@ -0,0 +1,114 @@
.. only:: comment
© Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
.. _about:
Network
=======
The ``Network`` class serves as the backbone of the simulation. It offers a framework to manage various network
components such as routers, switches, servers, and clients. This document provides a detailed explanation of how to
effectively use the ``Network`` class.
Example Usage
-------------
Below demonstrates how to use the Router node to connect Nodes, and block traffic using ACLs. For this demonstration,
we'll use the following Network that has a client, server, two switches, and a router.
.. code-block:: text
+------------+ +------------+ +------------+ +------------+ +------------+
| | | | | | | | | |
| client_1 +------+ switch_2 +------+ router_1 +------+ switch_1 +------+ server_1 |
| | | | | | | | | |
+------------+ +------------+ +------------+ +------------+ +------------+
1. Relevant imports
.. code-block:: python
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import Switch, NIC
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router, ACLAction
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
2. Create the Network
.. code-block:: python
network = Network()
3. Create and configure the Router
.. code-block:: python
router_1 = Router(hostname="router_1", num_ports=3)
router_1.power_on()
router_1.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
router_1.configure_port(port=2, ip_address="192.168.2.1", subnet_mask="255.255.255.0")
4. Create and configure the two Switches
.. code-block:: python
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2.power_on()
5. Connect the Switches to the Router
.. code-block:: python
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6])
router_1.enable_port(1)
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6])
router_1.enable_port(2)
6. Create the Client and Server nodes.
.. code-block:: python
client_1 = Computer(
hostname="client_1",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1"
)
client_1.power_on()
server_1 = Server(
hostname="server_1",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
)
server_1.power_on()
7. Connect the Client and Server to the relevant Switch
.. code-block:: python
network.connect(endpoint_a=switch_2.switch_ports[1], endpoint_b=client_1.ethernet_port[1])
network.connect(endpoint_a=switch_1.switch_ports[1], endpoint_b=server_1.ethernet_port[1])
8. Add ACL rules on the Router to allow ARP and ICMP traffic.
.. code-block:: python
router_1.acl.add_rule(
action=ACLAction.PERMIT,
src_port=Port.ARP,
dst_port=Port.ARP,
position=22
)
router_1.acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.ICMP,
position=23
)

View File

@@ -0,0 +1,73 @@
.. only:: comment
© Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
.. _about:
Router Module
=============
The router module contains classes for simulating the functions of a network router.
Router
------
The Router class represents a multi-port network router that can receive, process, and route network packets between its ports and other Nodes
The router maintains internal state including:
- RouteTable - Routing table to lookup where to forward packets.
- AccessControlList - Access control rules to block or allow packets.
- ARP cache - MAC address lookups for connected devices.
- ICMP handler - Handles ICMP requests to router interfaces.
The router receives incoming frames on enabled ports. It processes the frame headers and applies the following logic:
1. Checks the AccessControlList if the packet is permitted. If blocked, it is dropped.
2. For permitted packets, routes the frame based on:
- ARP cache lookups for destination MAC address.
- ICMP echo requests handled directly.
- RouteTable lookup to forward packet out other ports.
3. Updates ARP cache as it learns new information about the Network.
RouteTable
----------
The RouteTable holds RouteEntry objects representing routes. It finds the best route for a destination IP using a metric and the longest prefix match algorithm.
Routes can be added and looked up based on destination IP address. The RouteTable is used by the Router when forwarding packets between other Nodes.
AccessControlList
-----------------
The AccessControlList defines Access Control Rules to block or allow packets. Packets are checked against the rules to determine if they are permitted to be forwarded.
Rules can be added to deny or permit traffic based on IP, port, and protocol. The ACL is checked by the Router when packets are received.
Packet Processing
-----------------
-The Router supports the following protocols and packet types:
ARP
^^^
- Handles both ARP requests and responses.
- Updates ARP cache.
- Proxies ARP replies for connected networks.
- Routes ARP requests.
ICMP
^^^^
- Responds to ICMP echo requests to Router interfaces.
- Routes other ICMP messages based on routes.
TCP/UDP
^^^^^^^
- Forwards packets based on routes like IP.
- Applies ACL rules based on protocol, source/destination IP address, and source/destination port numbers.
- Decrements TTL and drops expired TTL packets.

View File

@@ -0,0 +1,8 @@
.. only:: comment
© Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
.. _about:
Switch
======

View File

@@ -1,20 +1,41 @@
from typing import Any, Dict, Union, Optional from typing import Any, Dict, Union, Optional, List
import matplotlib.pyplot as plt
import networkx as nx
from networkx import MultiGraph
from prettytable import PrettyTable, MARKDOWN
from primaite import getLogger from primaite import getLogger
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
from primaite.simulator.network.hardware.base import Link, NIC, Node, SwitchPort from primaite.simulator.network.hardware.base import Link, NIC, Node, SwitchPort, Switch
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router
from primaite.simulator.network.hardware.nodes.server import Server
_LOGGER = getLogger(__name__) _LOGGER = getLogger(__name__)
class Network(SimComponent): class Network(SimComponent):
"""Top level container object representing the physical network.""" """
Top level container object representing the physical network.
This class manages nodes, links, and other network components. It also
offers methods for rendering the network topology and gathering states.
:ivar Dict[str, Node] nodes: Dictionary mapping node UUIDs to Node instances.
:ivar Dict[str, Link] links: Dictionary mapping link UUIDs to Link instances.
"""
nodes: Dict[str, Node] = {} nodes: Dict[str, Node] = {}
links: Dict[str, Link] = {} links: Dict[str, Link] = {}
def __init__(self, **kwargs): def __init__(self, **kwargs):
"""Initialise the network.""" """"
Initialise the network.
Constructs the network and sets up its initial state including
the action manager and an empty MultiGraph for topology representation.
"""
super().__init__(**kwargs) super().__init__(**kwargs)
self.action_manager = ActionManager() self.action_manager = ActionManager()
@@ -25,15 +46,112 @@ class Network(SimComponent):
validator=AllowAllValidator(), validator=AllowAllValidator(),
), ),
) )
self._nx_graph = MultiGraph()
@property
def routers(self) -> List[Router]:
"""The Routers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Router)]
@property
def switches(self) -> List[Switch]:
"""The Switches in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Switch)]
@property
def computers(self) -> List[Computer]:
"""The Computers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Computer) and not isinstance(node, Server)]
@property
def servers(self) -> List[Server]:
"""The Servers in the Network."""
return [node for node in self.nodes.values() if isinstance(node, Server)]
def show(self, nodes: bool = True, ip_addresses: bool = True, links: bool = True, markdown: bool = False):
"""
Print tables describing the Network.
Generate and print PrettyTable instances that show details about nodes,
IP addresses, and links in the network. Output can be in Markdown format.
:param nodes: Include node details in the output. Defaults to True.
:param ip_addresses: Include IP address details in the output. Defaults to True.
:param links: Include link details in the output. Defaults to True.
:param markdown: Use Markdown style in table output. Defaults to False.
"""
nodes_type_map = {
"Router": self.routers,
"Switch": self.switches,
"Server": self.servers,
"Computer": self.computers
}
if nodes:
table = PrettyTable(["Node", "Type", "Operating State"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"Nodes"
for node_type, nodes in nodes_type_map.items():
for node in nodes:
table.add_row([node.hostname, node_type, node.operating_state.name])
print(table)
if ip_addresses:
table = PrettyTable(["Node", "Port", "IP Address", "Subnet Mask", "Default Gateway"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"IP Addresses"
for nodes in nodes_type_map.values():
for node in nodes:
for i, port in node.ethernet_port.items():
table.add_row([node.hostname, i, port.ip_address, port.subnet_mask, node.default_gateway])
print(table)
if links:
table = PrettyTable(["Endpoint A", "Endpoint B", "is Up", "Bandwidth (MBits)", "Current Load"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"Links"
links = list(self.links.values())
for nodes in nodes_type_map.values():
for node in nodes:
for link in links[::-1]:
if node in [link.endpoint_a.parent, link.endpoint_b.parent]:
table.add_row(
[
link.endpoint_a.parent.hostname,
link.endpoint_b.parent.hostname,
link.is_up,
link.bandwidth,
link.current_load_percent
]
)
links.remove(link)
print(table)
def clear_links(self):
"""Clear all the links in the network by resetting their component state for the episode."""
for link in self.links.values():
link.reset_component_for_episode()
def draw(self, seed: int = 123):
"""
Draw the Network using NetworkX and matplotlib.pyplot.
:param seed: An integer seed for reproducible layouts. Default is 123.
"""
pos = nx.spring_layout(self._nx_graph, seed=seed)
nx.draw(self._nx_graph, pos, with_labels=True)
plt.show()
def describe_state(self) -> Dict: def describe_state(self) -> Dict:
""" """
Produce a dictionary describing the current state of this object. Produce a dictionary describing the current state of the Network.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation. :return: A dictionary capturing the current state of the Network and its child objects.
:return: Current state of this object and child objects.
:rtype: Dict
""" """
state = super().describe_state() state = super().describe_state()
state.update( state.update(
@@ -48,14 +166,16 @@ class Network(SimComponent):
""" """
Add an existing node to the network. Add an existing node to the network.
:param node: Node instance that the network should keep track of. .. note:: If the node is already present in the network, a warning is logged.
:type node: Node
:param node: Node instance that should be kept track of by the network.
""" """
if node in self: if node in self:
_LOGGER.warning(f"Can't add node {node.uuid}. It is already in the network.") _LOGGER.warning(f"Can't add node {node.uuid}. It is already in the network.")
return return
self.nodes[node.uuid] = node self.nodes[node.uuid] = node
node.parent = self node.parent = self
self._nx_graph.add_node(node.hostname)
_LOGGER.info(f"Added node {node.uuid} to Network {self.uuid}") _LOGGER.info(f"Added node {node.uuid} to Network {self.uuid}")
def get_node_by_hostname(self, hostname: str) -> Optional[Node]: def get_node_by_hostname(self, hostname: str) -> Optional[Node]:
@@ -75,6 +195,8 @@ class Network(SimComponent):
""" """
Remove a node from the network. Remove a node from the network.
.. note:: If the node is not found in the network, a warning is logged.
:param node: Node instance that is currently part of the network that should be removed. :param node: Node instance that is currently part of the network that should be removed.
:type node: Node :type node: Node
""" """
@@ -85,18 +207,22 @@ class Network(SimComponent):
node.parent = None node.parent = None
_LOGGER.info(f"Removed node {node.uuid} from network {self.uuid}") _LOGGER.info(f"Removed node {node.uuid} from network {self.uuid}")
def connect(self, endpoint_a: Union[Node, NIC, SwitchPort], endpoint_b: Union[Node, NIC, SwitchPort], **kwargs) -> \ def connect(
None: self, endpoint_a: Union[NIC, SwitchPort], endpoint_b: Union[NIC, SwitchPort], **kwargs
"""Connect two nodes on the network by creating a link between an NIC/SwitchPort of each one. ) -> None:
:param endpoint_a: The endpoint to which to connect the link on the first node
:type endpoint_a: Union[NIC, SwitchPort]
:param endpoint_b: The endpoint to which to connct the link on the second node
:type endpoint_b: Union[NIC, SwitchPort]
:raises RuntimeError: _description_
""" """
node_a: Node = endpoint_a.parent if not isinstance(endpoint_a, Node) else endpoint_a Connect two endpoints on the network by creating a link between their NICs/SwitchPorts.
node_b: Node = endpoint_b.parent if not isinstance(endpoint_b, Node) else endpoint_b
.. note:: If the nodes owning the endpoints are not already in the network, they are automatically added.
:param endpoint_a: The first endpoint to connect.
:type endpoint_a: Union[NIC, SwitchPort]
:param endpoint_b: The second endpoint to connect.
:type endpoint_b: Union[NIC, SwitchPort]
:raises RuntimeError: If any validation or runtime checks fail.
"""
node_a: Node = endpoint_a.parent
node_b: Node = endpoint_b.parent
if node_a not in self: if node_a not in self:
self.add_node(node_a) self.add_node(node_a)
if node_b not in self: if node_b not in self:
@@ -104,12 +230,9 @@ class Network(SimComponent):
if node_a is node_b: if node_a is node_b:
_LOGGER.warning(f"Cannot link endpoint {endpoint_a} to {endpoint_b} because they belong to the same node.") _LOGGER.warning(f"Cannot link endpoint {endpoint_a} to {endpoint_b} because they belong to the same node.")
return return
if isinstance(endpoint_a, Node) and len(endpoint_a.nics) == 1:
endpoint_a = list(endpoint_a.nics.values())[0]
if isinstance(endpoint_b, Node) and len(endpoint_b.nics) == 1:
endpoint_b = list(endpoint_b.nics.values())[0]
link = Link(endpoint_a=endpoint_a, endpoint_b=endpoint_b, **kwargs) link = Link(endpoint_a=endpoint_a, endpoint_b=endpoint_b, **kwargs)
self.links[link.uuid] = link self.links[link.uuid] = link
self._nx_graph.add_edge(endpoint_a.parent.hostname, endpoint_b.parent.hostname)
link.parent = self link.parent = self
_LOGGER.info(f"Added link {link.uuid} to connect {endpoint_a} and {endpoint_b}") _LOGGER.info(f"Added link {link.uuid} to connect {endpoint_a} and {endpoint_b}")

View File

@@ -1,12 +1,13 @@
from __future__ import annotations from __future__ import annotations
import random
import re import re
import secrets import secrets
from enum import Enum from enum import Enum
from ipaddress import IPv4Address, IPv4Network from ipaddress import IPv4Address, IPv4Network
from typing import Dict, List, Optional, Tuple, Union from typing import Dict, List, Optional, Tuple, Union
from prettytable import PrettyTable from prettytable import PrettyTable, MARKDOWN
from primaite import getLogger from primaite import getLogger
from primaite.exceptions import NetworkError from primaite.exceptions import NetworkError
@@ -256,7 +257,6 @@ class NIC(SimComponent):
The Frame is passed to the Node. The Frame is passed to the Node.
:param frame: The network frame being received. :param frame: The network frame being received.
:type frame: :class:`~primaite.simulator.network.osi_layers.Frame`
""" """
if self.enabled: if self.enabled:
frame.decrement_ttl() frame.decrement_ttl()
@@ -266,9 +266,6 @@ class NIC(SimComponent):
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff": if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
self.connected_node.receive_frame(frame=frame, from_nic=self) self.connected_node.receive_frame(frame=frame, from_nic=self)
return True return True
else:
self.connected_node.sys_log.info("Dropping frame not for me")
print(frame)
return False return False
def __str__(self) -> str: def __str__(self) -> str:
@@ -562,9 +559,12 @@ class ARPCache:
self.arp: Dict[IPv4Address, ARPEntry] = {} self.arp: Dict[IPv4Address, ARPEntry] = {}
self.nics: Dict[str, "NIC"] = {} self.nics: Dict[str, "NIC"] = {}
def show(self): def show(self, markdown: bool = False):
"""Prints a table of ARC Cache.""" """Prints a table of ARC Cache."""
table = PrettyTable(["IP Address", "MAC Address", "Via"]) table = PrettyTable(["IP Address", "MAC Address", "Via"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} ARP Cache" table.title = f"{self.sys_log.hostname} ARP Cache"
for ip, arp in self.arp.items(): for ip, arp in self.arp.items():
table.add_row( table.add_row(
@@ -765,12 +765,22 @@ class ICMP:
identifier=frame.icmp.identifier, identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1, sequence=frame.icmp.sequence + 1,
) )
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet) payload = secrets.token_urlsafe(int(32/1.3)) # Standard ICMP 32 bytes size
frame = Frame(
ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet, payload=payload
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}") self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}")
src_nic.send_frame(frame) src_nic.send_frame(frame)
elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY: elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
self.sys_log.info(f"Received echo reply from {frame.ip.src_ip}") time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
f"Reply from {frame.ip.src_ip}: "
f"bytes={len(frame.payload)}, "
f"time={time_str}, "
f"TTL={frame.ip.ttl}"
)
if not self.request_replies.get(frame.icmp.identifier): if not self.request_replies.get(frame.icmp.identifier):
self.request_replies[frame.icmp.identifier] = 0 self.request_replies[frame.icmp.identifier] = 0
self.request_replies[frame.icmp.identifier] += 1 self.request_replies[frame.icmp.identifier] += 1
@@ -819,8 +829,8 @@ class ICMP:
# Data Link Layer # Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence) icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence)
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet) payload = secrets.token_urlsafe(int(32/1.3)) # Standard ICMP 32 bytes size
self.sys_log.info(f"Sending echo request to {target_ip_address}") frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet, payload=payload)
nic.send_frame(frame) nic.send_frame(frame)
return sequence, icmp_packet.identifier return sequence, icmp_packet.identifier
@@ -857,6 +867,8 @@ class Node(SimComponent):
"The hardware state of the node." "The hardware state of the node."
nics: Dict[str, NIC] = {} nics: Dict[str, NIC] = {}
"The NICs on the node." "The NICs on the node."
ethernet_port: Dict[int, NIC] = {}
"The NICs on the node by port id."
accounts: Dict[str, Account] = {} accounts: Dict[str, Account] = {}
"All accounts on the node." "All accounts on the node."
@@ -928,13 +940,17 @@ class Node(SimComponent):
) )
return state return state
def show(self): def show(self, markdown: bool = False):
"""Prints a table of the NICs on the Node.""" """Prints a table of the NICs on the Node."""
table = PrettyTable(["MAC Address", "Address", "Speed", "Status"]) table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Network Interface Cards" table.title = f"{self.hostname} Network Interface Cards"
for nic in self.nics.values(): for port, nic in self.ethernet_port.items():
table.add_row( table.add_row(
[ [
port,
nic.mac_address, nic.mac_address,
f"{nic.ip_address}/{nic.ip_network.prefixlen}", f"{nic.ip_address}/{nic.ip_network.prefixlen}",
nic.speed, nic.speed,
@@ -969,6 +985,7 @@ class Node(SimComponent):
""" """
if nic.uuid not in self.nics: if nic.uuid not in self.nics:
self.nics[nic.uuid] = nic self.nics[nic.uuid] = nic
self.ethernet_port[len(self.nics)] = nic
nic.connected_node = self nic.connected_node = self
nic.parent = self nic.parent = self
self.sys_log.info(f"Connected NIC {nic}") self.sys_log.info(f"Connected NIC {nic}")
@@ -990,6 +1007,10 @@ class Node(SimComponent):
if isinstance(nic, str): if isinstance(nic, str):
nic = self.nics.get(nic) nic = self.nics.get(nic)
if nic or nic.uuid in self.nics: if nic or nic.uuid in self.nics:
for port, _nic in self.ethernet_port.items():
if nic == _nic:
self.ethernet_port.pop(port)
break
self.nics.pop(nic.uuid) self.nics.pop(nic.uuid)
nic.parent = None nic.parent = None
nic.disable() nic.disable()
@@ -1014,7 +1035,7 @@ class Node(SimComponent):
self.sys_log.info("Pinging loopback address") self.sys_log.info("Pinging loopback address")
return any(nic.enabled for nic in self.nics.values()) return any(nic.enabled for nic in self.nics.values())
if self.operating_state == NodeOperatingState.ON: if self.operating_state == NodeOperatingState.ON:
self.sys_log.info(f"Attempting to ping {target_ip_address}") self.sys_log.info(f"Pinging {target_ip_address}:")
sequence, identifier = 0, None sequence, identifier = 0, None
while sequence < pings: while sequence < pings:
sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier, pings) sequence, identifier = self.icmp.ping(target_ip_address, sequence, identifier, pings)
@@ -1022,8 +1043,14 @@ class Node(SimComponent):
passed = request_replies == pings passed = request_replies == pings
if request_replies: if request_replies:
self.icmp.request_replies.pop(identifier) self.icmp.request_replies.pop(identifier)
else:
request_replies = 0
self.sys_log.info(
f"Ping statistics for {target_ip_address}: "
f"Packets: Sent = {pings}, "
f"Received = {request_replies}, "
f"Lost = {pings-request_replies} ({(pings-request_replies)/pings*100}% loss)")
return passed return passed
self.sys_log.info("Ping failed as the node is turned off")
return False return False
def send_frame(self, frame: Frame): def send_frame(self, frame: Frame):
@@ -1078,9 +1105,12 @@ class Switch(Node):
port.parent = self port.parent = self
port.port_num = port_num port.port_num = port_num
def show(self): def show(self, markdown: bool = False):
"""Prints a table of the SwitchPorts on the Switch.""" """Prints a table of the SwitchPorts on the Switch."""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"]) table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Switch Ports" table.title = f"{self.hostname} Switch Ports"
for port_num, port in self.switch_ports.items(): for port_num, port in self.switch_ports.items():
table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"]) table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"])

View File

@@ -5,7 +5,7 @@ from primaite.simulator.network.hardware.base import Node, NIC
class Computer(Node): class Computer(Node):
""" """
A basic computer class. A basic Computer class.
Example: Example:
>>> pc_a = Computer( >>> pc_a = Computer(
@@ -19,20 +19,20 @@ class Computer(Node):
Instances of computer come 'pre-packaged' with the following: Instances of computer come 'pre-packaged' with the following:
* Core Functionality: * Core Functionality:
* ARP. * ARP
* ICMP. * ICMP
* Packet Capture. * Packet Capture
* Sys Log. * Sys Log
* Services: * Services:
* DNS Client. * DNS Client
* FTP Client. * FTP Client
* LDAP Client. * LDAP Client
* NTP Client. * NTP Client
* Applications: * Applications:
* Email Client. * Email Client
* Web Browser. * Web Browser
* Processes: * Processes:
* Placeholder. * Placeholder
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):

View File

@@ -1,10 +1,11 @@
from __future__ import annotations from __future__ import annotations
import secrets
from enum import Enum from enum import Enum
from ipaddress import IPv4Address, IPv4Network from ipaddress import IPv4Address, IPv4Network
from typing import Dict, List, Optional, Tuple, Union from typing import Dict, List, Optional, Tuple, Union
from prettytable import PrettyTable from prettytable import PrettyTable, MARKDOWN
from primaite.simulator.core import SimComponent from primaite.simulator.core import SimComponent
from primaite.simulator.network.hardware.base import ARPCache, ICMP, NIC, Node from primaite.simulator.network.hardware.base import ARPCache, ICMP, NIC, Node
@@ -22,8 +23,16 @@ class ACLAction(Enum):
class ACLRule(SimComponent): class ACLRule(SimComponent):
def describe_state(self) -> Dict: """
pass Represents an Access Control List (ACL) rule.
:ivar ACLAction action: Action to be performed (Permit/Deny). Default is DENY.
:ivar Optional[IPProtocol] protocol: Network protocol. Default is None.
:ivar Optional[IPv4Address] src_ip: Source IP address. Default is None.
:ivar Optional[Port] src_port: Source port number. Default is None.
:ivar Optional[IPv4Address] dst_ip: Destination IP address. Default is None.
:ivar Optional[Port] dst_port: Destination port number. Default is None.
"""
action: ACLAction = ACLAction.DENY action: ACLAction = ACLAction.DENY
protocol: Optional[IPProtocol] = None protocol: Optional[IPProtocol] = None
@@ -43,8 +52,25 @@ class ACLRule(SimComponent):
rule_strings.append(f"{key}={value}") rule_strings.append(f"{key}={value}")
return ", ".join(rule_strings) return ", ".join(rule_strings)
def describe_state(self) -> Dict:
"""
Describes the current state of the ACLRule.
:return: A dictionary representing the current state.
"""
pass
class AccessControlList(SimComponent): class AccessControlList(SimComponent):
"""
Manages a list of ACLRules to filter network traffic.
:ivar SysLog sys_log: System logging instance.
:ivar ACLAction implicit_action: Default action for rules.
:ivar ACLRule implicit_rule: Implicit ACL rule, created based on implicit_action.
:ivar int max_acl_rules: Maximum number of ACL rules that can be added. Default is 25.
:ivar List[Optional[ACLRule]] _acl: A list containing the ACL rules.
"""
sys_log: SysLog sys_log: SysLog
implicit_action: ACLAction implicit_action: ACLAction
implicit_rule: ACLRule implicit_rule: ACLRule
@@ -62,10 +88,20 @@ class AccessControlList(SimComponent):
super().__init__(**kwargs) super().__init__(**kwargs)
def describe_state(self) -> Dict: def describe_state(self) -> Dict:
"""
Describes the current state of the AccessControlList.
:return: A dictionary representing the current state.
"""
pass pass
@property @property
def acl(self) -> List[Optional[ACLRule]]: def acl(self) -> List[Optional[ACLRule]]:
"""
Get the list of ACL rules.
:return: The list of ACL rules.
"""
return self._acl return self._acl
def add_rule( def add_rule(
@@ -78,6 +114,18 @@ class AccessControlList(SimComponent):
dst_port: Optional[Port] = None, dst_port: Optional[Port] = None,
position: int = 0, position: int = 0,
) -> None: ) -> None:
"""
Add a new ACL rule.
:param ACLAction action: Action to be performed (Permit/Deny).
:param Optional[IPProtocol] protocol: Network protocol.
:param Optional[Union[str, IPv4Address]] src_ip: Source IP address.
:param Optional[Port] src_port: Source port number.
:param Optional[Union[str, IPv4Address]] dst_ip: Destination IP address.
:param Optional[Port] dst_port: Destination port number.
:param int position: Position in the ACL list to insert the rule.
:raises ValueError: When the position is out of bounds.
"""
if isinstance(src_ip, str): if isinstance(src_ip, str):
src_ip = IPv4Address(src_ip) src_ip = IPv4Address(src_ip)
if isinstance(dst_ip, str): if isinstance(dst_ip, str):
@@ -90,6 +138,12 @@ class AccessControlList(SimComponent):
raise ValueError(f"Position {position} is out of bounds.") raise ValueError(f"Position {position} is out of bounds.")
def remove_rule(self, position: int) -> None: def remove_rule(self, position: int) -> None:
"""
Remove an ACL rule from a specific position.
:param int position: The position of the rule to be removed.
:raises ValueError: When the position is out of bounds.
"""
if 0 <= position < self.max_acl_rules: if 0 <= position < self.max_acl_rules:
self._acl[position] = None self._acl[position] = None
else: else:
@@ -103,6 +157,17 @@ class AccessControlList(SimComponent):
dst_ip: Union[str, IPv4Address], dst_ip: Union[str, IPv4Address],
dst_port: Optional[Port], dst_port: Optional[Port],
) -> Tuple[bool, Optional[Union[str, ACLRule]]]: ) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
"""
Check if a packet with the given properties is permitted through the ACL.
:param protocol: The protocol of the packet.
:param src_ip: Source IP address of the packet. Accepts string and IPv4Address.
:param src_port: Source port of the packet. Optional.
:param dst_ip: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_port: Destination port of the packet. Optional.
:return: A tuple with a boolean indicating if the packet is permitted and an optional rule or implicit action
string.
"""
if not isinstance(src_ip, IPv4Address): if not isinstance(src_ip, IPv4Address):
src_ip = IPv4Address(src_ip) src_ip = IPv4Address(src_ip)
if not isinstance(dst_ip, IPv4Address): if not isinstance(dst_ip, IPv4Address):
@@ -130,6 +195,16 @@ class AccessControlList(SimComponent):
dst_ip: Union[str, IPv4Address], dst_ip: Union[str, IPv4Address],
dst_port: Port, dst_port: Port,
) -> List[ACLRule]: ) -> List[ACLRule]:
"""
Get the list of relevant rules for a packet with given properties.
:param protocol: The protocol of the packet.
:param src_ip: Source IP address of the packet. Accepts string and IPv4Address.
:param src_port: Source port of the packet.
:param dst_ip: Destination IP address of the packet. Accepts string and IPv4Address.
:param dst_port: Destination port of the packet.
:return: A list of relevant ACLRules.
"""
if not isinstance(src_ip, IPv4Address): if not isinstance(src_ip, IPv4Address):
src_ip = IPv4Address(src_ip) src_ip = IPv4Address(src_ip)
if not isinstance(dst_ip, IPv4Address): if not isinstance(dst_ip, IPv4Address):
@@ -150,17 +225,16 @@ class AccessControlList(SimComponent):
return relevant_rules return relevant_rules
def show(self): def show(self, markdown: bool = False):
"""Prints a table of the routes in the RouteTable.""" """
Display the current ACL rules as a table.
:param markdown: Whether to display the table in Markdown format. Defaults to False.
""" """
action: ACLAction
protocol: Optional[IPProtocol]
src_ip: Optional[IPv4Address]
src_port: Optional[Port]
dst_ip: Optional[IPv4Address]
dst_port: Optional[Port]
"""
table = PrettyTable(["Index", "Action", "Protocol", "Src IP", "Src Port", "Dst IP", "Dst Port"]) table = PrettyTable(["Index", "Action", "Protocol", "Src IP", "Src Port", "Dst IP", "Dst Port"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} Access Control List" table.title = f"{self.sys_log.hostname} Access Control List"
for index, rule in enumerate(self.acl + [self.implicit_rule]): for index, rule in enumerate(self.acl + [self.implicit_rule]):
if rule: if rule:
@@ -213,6 +287,11 @@ class RouteEntry(SimComponent):
super().__init__(**kwargs) super().__init__(**kwargs)
def describe_state(self) -> Dict: def describe_state(self) -> Dict:
"""
Describes the current state of the RouteEntry.
:return: A dictionary representing the current state.
"""
pass pass
@@ -220,12 +299,7 @@ class RouteTable(SimComponent):
""" """
Represents a routing table holding multiple route entries. Represents a routing table holding multiple route entries.
Attributes: :ivar List[RouteEntry] routes: A list of RouteEntry objects.
routes (List[RouteEntry]): A list of RouteEntry objects.
Methods:
add_route: Add a route to the routing table.
find_best_route: Find the best route for a given destination IP.
Example: Example:
>>> rt = RouteTable() >>> rt = RouteTable()
@@ -244,6 +318,11 @@ class RouteTable(SimComponent):
sys_log: SysLog sys_log: SysLog
def describe_state(self) -> Dict: def describe_state(self) -> Dict:
"""
Describes the current state of the RouteTable.
:return: A dictionary representing the current state.
"""
pass pass
def add_route( def add_route(
@@ -253,9 +332,13 @@ class RouteTable(SimComponent):
next_hop: Union[IPv4Address, str], next_hop: Union[IPv4Address, str],
metric: float = 0.0, metric: float = 0.0,
): ):
"""Add a route to the routing table. """
Add a route to the routing table.
:param route: A RouteEntry object representing the route. :param address: The destination address of the route.
:param subnet_mask: The subnet mask of the route.
:param next_hop: The next hop IP for the route.
:param metric: The metric of the route, default is 0.0.
""" """
for key in {address, subnet_mask, next_hop}: for key in {address, subnet_mask, next_hop}:
if not isinstance(key, IPv4Address): if not isinstance(key, IPv4Address):
@@ -267,10 +350,10 @@ class RouteTable(SimComponent):
""" """
Find the best route for a given destination IP. Find the best route for a given destination IP.
:param destination_ip: The destination IPv4Address to find the route for. This method uses the Longest Prefix Match algorithm and considers metrics to find the best route.
:return: The best matching RouteEntry, or None if no route matches.
The algorithm uses Longest Prefix Match and considers metrics to find the best route. :param destination_ip: The destination IP to find the route for.
:return: The best matching RouteEntry, or None if no route matches.
""" """
if not isinstance(destination_ip, IPv4Address): if not isinstance(destination_ip, IPv4Address):
destination_ip = IPv4Address(destination_ip) destination_ip = IPv4Address(destination_ip)
@@ -290,9 +373,16 @@ class RouteTable(SimComponent):
return best_route return best_route
def show(self): def show(self, markdown: bool = False):
"""Prints a table of the routes in the RouteTable.""" """
Display the current routing table as a table.
:param markdown: Whether to display the table in Markdown format. Defaults to False.
"""
table = PrettyTable(["Index", "Address", "Next Hop", "Metric"]) table = PrettyTable(["Index", "Address", "Next Hop", "Metric"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} Route Table" table.title = f"{self.sys_log.hostname} Route Table"
for index, route in enumerate(self.routes): for index, route in enumerate(self.routes):
network = IPv4Network(f"{route.address}/{route.subnet_mask}") network = IPv4Network(f"{route.address}/{route.subnet_mask}")
@@ -301,6 +391,12 @@ class RouteTable(SimComponent):
class RouterARPCache(ARPCache): class RouterARPCache(ARPCache):
"""
Inherits from ARPCache and adds router-specific ARP packet processing.
:ivar SysLog sys_log: A system log for logging messages.
:ivar Router router: The router to which this ARP cache belongs.
"""
def __init__(self, sys_log: SysLog, router: Router): def __init__(self, sys_log: SysLog, router: Router):
super().__init__(sys_log) super().__init__(sys_log)
self.router: Router = router self.router: Router = router
@@ -310,7 +406,7 @@ class RouterARPCache(ARPCache):
Overridden method to process a received ARP packet in a router-specific way. Overridden method to process a received ARP packet in a router-specific way.
:param from_nic: The NIC that received the ARP packet. :param from_nic: The NIC that received the ARP packet.
:param frame: The original arp frame. :param frame: The original ARP frame.
""" """
arp_packet = frame.arp arp_packet = frame.arp
@@ -356,6 +452,16 @@ class RouterARPCache(ARPCache):
class RouterICMP(ICMP): class RouterICMP(ICMP):
"""
A class to represent a router's Internet Control Message Protocol (ICMP) handler.
:param sys_log: System log for logging network events and errors.
:type sys_log: SysLog
:param arp_cache: The ARP cache for resolving MAC addresses.
:type arp_cache: ARPCache
:param router: The router to which this ICMP handler belongs.
:type router: Router
"""
router: Router router: Router
def __init__(self, sys_log: SysLog, arp_cache: ARPCache, router: Router): def __init__(self, sys_log: SysLog, arp_cache: ARPCache, router: Router):
@@ -363,6 +469,13 @@ class RouterICMP(ICMP):
self.router = router self.router = router
def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False): def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False):
"""
Process incoming ICMP frames based on ICMP type.
:param frame: The incoming frame to process.
:param from_nic: The network interface where the frame is coming from.
:param is_reattempt: Flag to indicate if the process is a reattempt.
"""
if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
# determine if request is for router interface or whether it needs to be routed # determine if request is for router interface or whether it needs to be routed
@@ -386,7 +499,10 @@ class RouterICMP(ICMP):
identifier=frame.icmp.identifier, identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1, sequence=frame.icmp.sequence + 1,
) )
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet) payload = secrets.token_urlsafe(int(32/1.3)) # Standard ICMP 32 bytes size
frame = Frame(
ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet, payload=payload
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}") self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}")
src_nic.send_frame(frame) src_nic.send_frame(frame)
@@ -399,7 +515,14 @@ class RouterICMP(ICMP):
for nic in self.router.nics.values(): for nic in self.router.nics.values():
if nic.ip_address == frame.ip.dst_ip: if nic.ip_address == frame.ip.dst_ip:
if nic.enabled: if nic.enabled:
self.sys_log.info(f"Received echo reply from {frame.ip.src_ip}") time = frame.transmission_duration()
time_str = f"{time}ms" if time > 0 else "<1ms"
self.sys_log.info(
f"Reply from {frame.ip.src_ip}: "
f"bytes={len(frame.payload)}, "
f"time={time_str}, "
f"TTL={frame.ip.ttl}"
)
if not self.request_replies.get(frame.icmp.identifier): if not self.request_replies.get(frame.icmp.identifier):
self.request_replies[frame.icmp.identifier] = 0 self.request_replies[frame.icmp.identifier] = 0
self.request_replies[frame.icmp.identifier] += 1 self.request_replies[frame.icmp.identifier] += 1
@@ -410,6 +533,13 @@ class RouterICMP(ICMP):
class Router(Node): class Router(Node):
"""
A class to represent a network router node.
:ivar str hostname: The name of the router node.
:ivar int num_ports: The number of ports in the router.
:ivar dict kwargs: Optional keyword arguments for SysLog, ACL, RouteTable, RouterARPCache, RouterICMP.
"""
num_ports: int num_ports: int
ethernet_ports: Dict[int, NIC] = {} ethernet_ports: Dict[int, NIC] = {}
acl: AccessControlList acl: AccessControlList
@@ -438,14 +568,32 @@ class Router(Node):
self.icmp.arp = self.arp self.icmp.arp = self.arp
def _get_port_of_nic(self, target_nic: NIC) -> Optional[int]: def _get_port_of_nic(self, target_nic: NIC) -> Optional[int]:
"""
Retrieve the port number for a given NIC.
:param target_nic: Target network interface.
:return: The port number if NIC is found, otherwise None.
"""
for port, nic in self.ethernet_ports.items(): for port, nic in self.ethernet_ports.items():
if nic == target_nic: if nic == target_nic:
return port return port
def describe_state(self) -> Dict: def describe_state(self) -> Dict:
"""
Describes the current state of the Router.
:return: A dictionary representing the current state.
"""
pass pass
def route_frame(self, frame: Frame, from_nic: NIC, re_attempt: bool = False) -> None: def route_frame(self, frame: Frame, from_nic: NIC, re_attempt: bool = False) -> None:
"""
Route a given frame from a source NIC to its destination.
:param frame: The frame to be routed.
:param from_nic: The source network interface.
:param re_attempt: Flag to indicate if the routing is a reattempt.
"""
# Check if src ip is on network of one of the NICs # Check if src ip is on network of one of the NICs
nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip) nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip)
target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip) target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip)
@@ -477,13 +625,10 @@ class Router(Node):
def receive_frame(self, frame: Frame, from_nic: NIC): def receive_frame(self, frame: Frame, from_nic: NIC):
""" """
Receive a Frame from the connected NIC and process it. Receive a frame from a NIC and processes it based on its protocol.
Depending on the protocol, the frame is passed to the appropriate handler such as ARP or ICMP, or up to the :param frame: The incoming frame.
SessionManager if no code manager exists. :param from_nic: The network interface where the frame is coming from.
:param frame: The Frame being received.
:param from_nic: The NIC that received the frame.
""" """
route_frame = False route_frame = False
protocol = frame.ip.protocol protocol = frame.ip.protocol
@@ -520,6 +665,13 @@ class Router(Node):
self.route_frame(frame, from_nic) self.route_frame(frame, from_nic)
def configure_port(self, port: int, ip_address: Union[IPv4Address, str], subnet_mask: Union[IPv4Address, str]): def configure_port(self, port: int, ip_address: Union[IPv4Address, str], subnet_mask: Union[IPv4Address, str]):
"""
Configure the IP settings of a given port.
:param port: The port to configure.
:param ip_address: The IP address to set.
:param subnet_mask: The subnet mask to set.
"""
if not isinstance(ip_address, IPv4Address): if not isinstance(ip_address, IPv4Address):
ip_address = IPv4Address(ip_address) ip_address = IPv4Address(ip_address)
if not isinstance(subnet_mask, IPv4Address): if not isinstance(subnet_mask, IPv4Address):
@@ -530,18 +682,36 @@ class Router(Node):
self.sys_log.info(f"Configured port {port} with ip_address={ip_address}/{nic.ip_network.prefixlen}") self.sys_log.info(f"Configured port {port} with ip_address={ip_address}/{nic.ip_network.prefixlen}")
def enable_port(self, port: int): def enable_port(self, port: int):
"""
Enable a given port on the router.
:param port: The port to enable.
"""
nic = self.ethernet_ports.get(port) nic = self.ethernet_ports.get(port)
if nic: if nic:
nic.enable() nic.enable()
def disable_port(self, port: int): def disable_port(self, port: int):
"""
Disable a given port on the router.
:param port: The port to disable.
"""
nic = self.ethernet_ports.get(port) nic = self.ethernet_ports.get(port)
if nic: if nic:
nic.disable() nic.disable()
def show(self): def show(self, markdown: bool = False):
"""
Prints the state of the Ethernet interfaces on the Router.
:param markdown: Flag to indicate if the output should be in markdown format.
"""
"""Prints a table of the NICs on the Node.""" """Prints a table of the NICs on the Node."""
table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"]) table = PrettyTable(["Port", "MAC Address", "Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Ethernet Interfaces" table.title = f"{self.hostname} Ethernet Interfaces"
for port, nic in self.ethernet_ports.items(): for port, nic in self.ethernet_ports.items():
table.add_row( table.add_row(

View File

@@ -0,0 +1,37 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.base import Node, NIC
from primaite.simulator.network.hardware.nodes.computer import Computer
class Server(Computer):
"""
A basic Server class.
Example:
>>> server_a = Server(
hostname="server_a",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
)
>>> server_a.power_on()
Instances of Server come 'pre-packaged' with the following:
* Core Functionality:
* ARP
* ICMP
* Packet Capture
* Sys Log
* Services:
* DNS Client
* FTP Client
* LDAP Client
* NTP Client
* Applications:
* Email Client
* Web Browser
* Processes:
* Placeholder
"""

View File

@@ -2,10 +2,80 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import Switch, NIC from primaite.simulator.network.hardware.base import Switch, NIC
from primaite.simulator.network.hardware.nodes.computer import Computer from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router, ACLAction from primaite.simulator.network.hardware.nodes.router import Router, ACLAction
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.network.transmission.transport_layer import Port
def client_server_routed() -> Network:
"""
A basic Client/Server Network routed between subnets.
+------------+ +------------+ +------------+ +------------+ +------------+
| | | | | | | | | |
| client_1 +------+ switch_2 +------+ router_1 +------+ switch_1 +------+ server_1 |
| | | | | | | | | |
+------------+ +------------+ +------------+ +------------+ +------------+
IP Table:
"""
network = Network()
# Router 1
router_1 = Router(hostname="router_1", num_ports=3)
router_1.power_on()
router_1.configure_port(port=1, ip_address="192.168.1.1", subnet_mask="255.255.255.0")
router_1.configure_port(port=2, ip_address="192.168.2.1", subnet_mask="255.255.255.0")
# Switch 1
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6])
router_1.enable_port(1)
# Switch 2
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2.power_on()
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6])
router_1.enable_port(2)
# Client 1
client_1 = Computer(
hostname="client_1",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1"
)
client_1.power_on()
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
# Server 1
server_1 = Server(
hostname="server_1",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
)
server_1.power_on()
network.connect(endpoint_b=server_1.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
router_1.acl.add_rule(
action=ACLAction.PERMIT,
src_port=Port.ARP,
dst_port=Port.ARP,
position=22
)
router_1.acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.ICMP,
position=23
)
return network
def arcd_uc2_network() -> Network: def arcd_uc2_network() -> Network:
""" """
Models the ARCD Use Case 2 Network. Models the ARCD Use Case 2 Network.
@@ -40,9 +110,7 @@ def arcd_uc2_network() -> Network:
| | | |
+------------+ +------------+
Example:
>>> network = arcd_uc2_network()
>>> network.get_node_by_hostname("client_1").ping("192.168.1.10")
""" """
network = Network() network = Network()
@@ -73,7 +141,7 @@ def arcd_uc2_network() -> Network:
default_gateway="192.168.10.1" default_gateway="192.168.10.1"
) )
client_1.power_on() client_1.power_on()
network.connect(endpoint_a=client_1, endpoint_b=switch_2.switch_ports[1]) network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
# Client 2 # Client 2
client_2 = Computer( client_2 = Computer(
@@ -83,60 +151,59 @@ def arcd_uc2_network() -> Network:
default_gateway="192.168.10.1" default_gateway="192.168.10.1"
) )
client_2.power_on() client_2.power_on()
network.connect(endpoint_a=client_2, endpoint_b=switch_2.switch_ports[2]) network.connect(endpoint_b=client_2.ethernet_port[1], endpoint_a=switch_2.switch_ports[2])
# Domain Controller # Domain Controller
domain_controller = Computer( domain_controller = Server(
hostname="domain_controller", hostname="domain_controller",
ip_address="192.168.1.10", ip_address="192.168.1.10",
subnet_mask="255.255.255.0", subnet_mask="255.255.255.0",
default_gateway="192.168.1.1" default_gateway="192.168.1.1"
) )
domain_controller.power_on() domain_controller.power_on()
network.connect(endpoint_a=domain_controller, endpoint_b=switch_1.switch_ports[1]) network.connect(endpoint_b=domain_controller.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
# Web Server # Web Server
web_server = Computer( web_server = Server(
hostname="web_server", hostname="web_server",
ip_address="192.168.1.12", ip_address="192.168.1.12",
subnet_mask="255.255.255.0", subnet_mask="255.255.255.0",
default_gateway="192.168.1.1" default_gateway="192.168.1.1"
) )
web_server.power_on() web_server.power_on()
network.connect(endpoint_a=web_server, endpoint_b=switch_1.switch_ports[2]) network.connect(endpoint_b=web_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[2])
# Database Server # Database Server
database_server = Computer( database_server = Server(
hostname="database_server", hostname="database_server",
ip_address="192.168.1.14", ip_address="192.168.1.14",
subnet_mask="255.255.255.0", subnet_mask="255.255.255.0",
default_gateway="192.168.1.1" default_gateway="192.168.1.1"
) )
database_server.power_on() database_server.power_on()
network.connect(endpoint_a=database_server, endpoint_b=switch_1.switch_ports[3]) network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3])
# Backup Server # Backup Server
backup_server = Computer( backup_server = Server(
hostname="backup_server", hostname="backup_server",
ip_address="192.168.1.16", ip_address="192.168.1.16",
subnet_mask="255.255.255.0", subnet_mask="255.255.255.0",
default_gateway="192.168.1.1" default_gateway="192.168.1.1"
) )
backup_server.power_on() backup_server.power_on()
network.connect(endpoint_a=backup_server, endpoint_b=switch_1.switch_ports[4]) network.connect(endpoint_b=backup_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[4])
# Security Suite # Security Suite
security_suite = Computer( security_suite = Server(
hostname="security_suite", hostname="security_suite",
ip_address="192.168.1.110", ip_address="192.168.1.110",
subnet_mask="255.255.255.0", subnet_mask="255.255.255.0",
default_gateway="192.168.1.1" default_gateway="192.168.1.1"
) )
security_suite.power_on() security_suite.power_on()
network.connect(endpoint_a=security_suite, endpoint_b=switch_1.switch_ports[7]) network.connect(endpoint_b=security_suite.ethernet_port[1], endpoint_a=switch_1.switch_ports[7])
security_suite_external_nic = NIC(ip_address="192.168.10.110", subnet_mask="255.255.255.0") security_suite.connect_nic(NIC(ip_address="192.168.10.110", subnet_mask="255.255.255.0"))
security_suite.connect_nic(security_suite_external_nic) network.connect(endpoint_b=security_suite.ethernet_port[2], endpoint_a=switch_2.switch_ports[7])
network.connect(endpoint_a=security_suite_external_nic, endpoint_b=switch_2.switch_ports[7])
router_1.acl.add_rule( router_1.acl.add_rule(
action=ACLAction.PERMIT, action=ACLAction.PERMIT,

View File

@@ -124,6 +124,11 @@ class Frame(BaseModel):
if not self.received_timestamp: if not self.received_timestamp:
self.received_timestamp = datetime.now() self.received_timestamp = datetime.now()
def transmission_duration(self) -> int:
"""The transmission duration in milliseconds."""
delta = self.received_timestamp - self.sent_timestamp
return int(delta.microseconds / 1000)
@property @property
def size(self) -> float: # noqa - Keep it as MBits as this is how they're expressed def size(self) -> float: # noqa - Keep it as MBits as this is how they're expressed
"""The size of the Frame in Bytes.""" """The size of the Frame in Bytes."""

View File

@@ -1,6 +1,8 @@
import logging import logging
from pathlib import Path from pathlib import Path
from prettytable import PrettyTable, MARKDOWN
from primaite.simulator import TEMP_SIM_OUTPUT from primaite.simulator import TEMP_SIM_OUTPUT
@@ -43,7 +45,7 @@ class SysLog:
file_handler = logging.FileHandler(filename=log_path) file_handler = logging.FileHandler(filename=log_path)
file_handler.setLevel(logging.DEBUG) file_handler.setLevel(logging.DEBUG)
log_format = "%(asctime)s %(levelname)s: %(message)s" log_format = "%(asctime)s::%(levelname)s::%(message)s"
file_handler.setFormatter(logging.Formatter(log_format)) file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"{self.hostname}_sys_log") self.logger = logging.getLogger(f"{self.hostname}_sys_log")
@@ -52,6 +54,19 @@ class SysLog:
self.logger.addFilter(_NotJSONFilter()) self.logger.addFilter(_NotJSONFilter())
def show(self, last_n: int = 10, markdown: bool = False):
table = PrettyTable(["Timestamp", "Level", "Message"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Sys Log"
if self._get_log_path().exists():
with open(self._get_log_path()) as file:
lines = file.readlines()
for line in lines[-last_n:]:
table.add_row(line.strip().split("::"))
print(table)
def _get_log_path(self) -> Path: def _get_log_path(self) -> Path:
""" """
Constructs the path for the log file based on the hostname. Constructs the path for the log file based on the hostname.