#1706 - Tidies up the sysLog ARPCache, and ICMP classes and integrated them into the Node. Tidied up the base implementation of SoftwareManager and SessionManager. Tidies up the public API for Services and Applications. Added the SwitchPort and Switch classes. Added a basic test in test_frame_transmission.py that tests sending a frame from one node to another across a multi-switch network.

This commit is contained in:
Chris McCarthy
2023-08-07 19:33:52 +01:00
parent 04f1cb0dc6
commit 139d739732
15 changed files with 846 additions and 216 deletions

View File

@@ -0,0 +1,68 @@
import logging
from pathlib import Path
from typing import Optional
from primaite.simulator import TEMP_SIM_OUTPUT
class _JSONFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
"""Filter logs that start and end with '{' and '}' (JSON-like messages)."""
return record.getMessage().startswith("{") and record.getMessage().endswith("}")
class PacketCapture:
"""
Represents a PacketCapture component on a Node in the simulation environment.
PacketCapture is a service that logs Frames as json strings; It's Wireshark for PrimAITE.
The PCAPs are logged to: <simulation output directory>/<hostname>/<hostname>_<ip address>_pcap.log
"""
def __init__(self, hostname: str, ip_address: Optional[str] = None):
"""
Initialize the PacketCapture process.
:param hostname: The hostname for which PCAP logs are being recorded.
:param ip_address: The IP address associated with the PCAP logs.
"""
self.hostname: str = hostname
"The hostname for which PCAP logs are being recorded."
self.ip_address: str = ip_address
"The IP address associated with the PCAP logs."
self._setup_logger()
def _setup_logger(self):
"""Set up the logger configuration."""
log_path = self._get_log_path()
file_handler = logging.FileHandler(filename=log_path)
file_handler.setLevel(60) # Custom log level > CRITICAL to prevent any unwanted standard DEBUG-CRITICAL logs
log_format = "%(message)s"
file_handler.setFormatter(logging.Formatter(log_format))
logger_name = f"{self.hostname}_{self.ip_address}_pcap" if self.ip_address else f"{self.hostname}_pcap"
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(60) # Custom log level > CRITICAL to prevent any unwanted standard DEBUG-CRITICAL logs
self.logger.addHandler(file_handler)
self.logger.addFilter(_JSONFilter())
def _get_log_path(self) -> Path:
"""Get the path for the log file."""
root = TEMP_SIM_OUTPUT / self.hostname
root.mkdir(exist_ok=True, parents=True)
if self.ip_address:
return root / f"{self.hostname}_{self.ip_address}_pcap.log"
return root / f"{self.hostname}_pcap.log"
def capture(self, frame): # noqa - I'll have a circular import and cant use if TYPE_CHECKING ;(
"""
Capture a Frame and log it.
:param frame: The PCAP frame to capture.
"""
msg = frame.model_dump_json()
self.logger.log(level=60, msg=msg) # Log at custom log level > CRITICAL

View File

@@ -0,0 +1,177 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING
from primaite.simulator.core import SimComponent
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
if TYPE_CHECKING:
from primaite.simulator.network.hardware.base import ARPCache
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.core.sys_log import SysLog
class Session(SimComponent):
"""
Models a network session.
Encapsulates information related to communication between two network endpoints, including the protocol,
source and destination IPs and ports.
:param protocol: The IP protocol used in the session.
:param src_ip: The source IP address.
:param dst_ip: The destination IP address.
:param src_port: The source port number (optional).
:param dst_port: The destination port number (optional).
:param connected: A flag indicating whether the session is connected.
"""
protocol: IPProtocol
src_ip: IPv4Address
dst_ip: IPv4Address
src_port: Optional[Port]
dst_port: Optional[Port]
connected: bool = False
@classmethod
def from_session_key(
cls, session_key: Tuple[IPProtocol, IPv4Address, IPv4Address, Optional[Port], Optional[Port]]
) -> Session:
"""
Create a Session instance from a session key tuple.
:param session_key: Tuple containing the session details.
:return: A Session instance.
"""
protocol, src_ip, dst_ip, src_port, dst_port = session_key
return Session(protocol=protocol, src_ip=src_ip, dst_ip=dst_ip, src_port=src_port, dst_port=dst_port)
def describe_state(self) -> Dict:
"""
Describes the current state of the session as a dictionary.
:return: A dictionary containing the current state of the session.
"""
pass
class SessionManager:
"""
Manages network sessions, including session creation, lookup, and communication with other components.
:param sys_log: A reference to the system log component.
:param arp_cache: A reference to the ARP cache component.
"""
def __init__(self, sys_log: SysLog, arp_cache: "ARPCache"):
self.sessions_by_key: Dict[
Tuple[IPProtocol, IPv4Address, IPv4Address, Optional[Port], Optional[Port]], Session
] = {}
self.sessions_by_uuid: Dict[str, Session] = {}
self.sys_log: SysLog = sys_log
self.software_manager: SoftwareManager = None # Noqa
self.arp_cache: "ARPCache" = arp_cache
def describe_state(self) -> Dict:
"""
Describes the current state of the session manager as a dictionary.
:return: A dictionary containing the current state of the session manager.
"""
pass
@staticmethod
def _get_session_key(
frame: Frame, from_source: bool = True
) -> Tuple[IPProtocol, IPv4Address, IPv4Address, Optional[Port], Optional[Port]]:
"""
Extracts the session key from the given frame.
The session key is a tuple containing the following elements:
- IPProtocol: The transport protocol (e.g. TCP, UDP, ICMP).
- IPv4Address: The source IP address.
- IPv4Address: The destination IP address.
- Optional[Port]: The source port number (if applicable).
- Optional[Port]: The destination port number (if applicable).
:param frame: The network frame from which to extract the session key.
:param from_source: A flag to indicate if the key should be extracted from the source or destination.
:return: A tuple containing the session key.
"""
protocol = frame.ip.protocol
src_ip = frame.ip.src_ip
dst_ip = frame.ip.dst_ip
if protocol == IPProtocol.TCP:
if from_source:
src_port = frame.tcp.src_port
dst_port = frame.tcp.dst_port
else:
dst_port = frame.tcp.src_port
src_port = frame.tcp.dst_port
elif protocol == IPProtocol.UDP:
if from_source:
src_port = frame.udp.src_port
dst_port = frame.udp.dst_port
else:
dst_port = frame.udp.src_port
src_port = frame.udp.dst_port
else:
src_port = None
dst_port = None
return protocol, src_ip, dst_ip, src_port, dst_port
def receive_payload_from_software_manager(self, payload: Any, session_id: Optional[int] = None):
"""
Receive a payload from the SoftwareManager.
If no session_id, a Session is established. Once established, the payload is sent to ``send_payload_to_nic``.
:param payload: The payload to be sent.
:param session_id: The Session ID the payload is to originate from. Optional. If None, one will be created.
"""
# TODO: Implement session creation and
self.send_payload_to_nic(payload, session_id)
def send_payload_to_software_manager(self, payload: Any, session_id: int):
"""
Send a payload to the software manager.
:param payload: The payload to be sent.
:param session_id: The Session ID the payload originates from.
"""
self.software_manager.receive_payload_from_session_manger()
def send_payload_to_nic(self, payload: Any, session_id: int):
"""
Send a payload across the Network.
Takes a payload and a session_id. Builds a Frame and sends it across the network via a NIC.
:param payload: The payload to be sent.
:param session_id: The Session ID the payload originates from
"""
# TODO: Implement frame construction and sent to NIC.
pass
def receive_payload_from_nic(self, frame: Frame):
"""
Receive a Frame from the NIC.
Extract the session key using the _get_session_key method, and forward the payload to the appropriate
session. If the session does not exist, a new one is created.
:param frame: The frame being received.
"""
session_key = self._get_session_key(frame)
session = self.sessions_by_key.get(session_key)
if not session:
# Create new session
session = Session.from_session_key(session_key)
self.sessions_by_key[session_key] = session
self.sessions_by_uuid[session.uuid] = session
self.software_manager.receive_payload_from_session_manger(payload=frame, session=session)
# TODO: Implement the frame deconstruction and send to SoftwareManager.

View File

@@ -0,0 +1,99 @@
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING, Union
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.session_manager import Session
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.software import SoftwareType
if TYPE_CHECKING:
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.sys_log import SysLog
class SoftwareManager:
"""A class that manages all running Services and Applications on a Node and facilitates their communication."""
def __init__(self, session_manager: "SessionManager", sys_log: "SysLog"):
"""
Initialize a new instance of SoftwareManager.
:param session_manager: The session manager handling network communications.
"""
self.session_manager = session_manager
self.services: Dict[str, Service] = {}
self.applications: Dict[str, Application] = {}
self.port_protocol_mapping: Dict[Tuple[Port, IPProtocol], Union[Service, Application]] = {}
self.sys_log: SysLog = sys_log
def add_service(self, name: str, service: Service, port: Port, protocol: IPProtocol):
"""
Add a Service to the manager.
:param name: The name of the service.
:param service: The service instance.
:param port: The port used by the service.
:param protocol: The network protocol used by the service.
"""
service.software_manager = self
self.services[name] = service
self.port_protocol_mapping[(port, protocol)] = service
def add_application(self, name: str, application: Application, port: Port, protocol: IPProtocol):
"""
Add an Application to the manager.
:param name: The name of the application.
:param application: The application instance.
:param port: The port used by the application.
:param protocol: The network protocol used by the application.
"""
application.software_manager = self
self.applications[name] = application
self.port_protocol_mapping[(port, protocol)] = application
def send_internal_payload(self, target_software: str, target_software_type: SoftwareType, payload: Any):
"""
Send a payload to a specific service or application.
:param target_software: The name of the target service or application.
:param target_software_type: The type of software (Service, Application, Process).
:param payload: The data to be sent.
:param receiver_type: The type of the target, either 'service' or 'application'.
"""
if target_software_type is SoftwareType.SERVICE:
receiver = self.services.get(target_software)
elif target_software_type is SoftwareType.APPLICATION:
receiver = self.applications.get(target_software)
else:
raise ValueError(f"Invalid receiver type {target_software_type}")
if receiver:
receiver.receive_payload(payload)
else:
raise ValueError(f"No {target_software_type.name.lower()} found with the name {target_software}")
def send_payload_to_session_manger(self, payload: Any, session_id: Optional[int] = None):
"""
Send a payload to the SessionManager.
:param payload: The payload to be sent.
:param session_id: The Session ID the payload is to originate from. Optional.
"""
self.session_manager.receive_payload_from_software_manager(payload, session_id)
def receive_payload_from_session_manger(self, payload: Any, session: Session):
"""
Receive a payload from the SessionManager and forward it to the corresponding service or application.
:param payload: The payload being received.
:param session: The transport session the payload originates from.
"""
# receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None)
# if receiver:
# receiver.receive_payload(None, payload)
# else:
# raise ValueError(f"No service or application found for port {port} and protocol {protocol}")
pass

View File

@@ -0,0 +1,103 @@
import logging
from pathlib import Path
from primaite.simulator import TEMP_SIM_OUTPUT
class _NotJSONFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
"""
Determines if a log message does not start and end with '{' and '}' (i.e., it is not a JSON-like message).
:param record: LogRecord object containing all the information pertinent to the event being logged.
:return: True if log message is not JSON-like, False otherwise.
"""
return not record.getMessage().startswith("{") and not record.getMessage().endswith("}")
class SysLog:
"""
A SysLog class is a simple logger dedicated to managing and writing system logs for a Node.
Each log message is written to a file located at: <simulation output directory>/<hostname>/<hostname>_sys.log
"""
def __init__(self, hostname: str):
"""
Constructs a SysLog instance for a given hostname.
:param hostname: The hostname associated with the system logs being recorded.
"""
self.hostname = hostname
self._setup_logger()
def _setup_logger(self):
"""
Configures the logger for this SysLog instance.
The logger is set to the DEBUG level, and is equipped with a handler that writes to a file and filters out
JSON-like messages.
"""
log_path = self._get_log_path()
file_handler = logging.FileHandler(filename=log_path)
file_handler.setLevel(logging.DEBUG)
log_format = "%(asctime)s %(levelname)s: %(message)s"
file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"{self.hostname}_sys_log")
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
self.logger.addFilter(_NotJSONFilter())
def _get_log_path(self) -> Path:
"""
Constructs the path for the log file based on the hostname.
:return: Path object representing the location of the log file.
"""
root = TEMP_SIM_OUTPUT / self.hostname
root.mkdir(exist_ok=True, parents=True)
return root / f"{self.hostname}_sys.log"
def debug(self, msg: str):
"""
Logs a message with the DEBUG level.
:param msg: The message to be logged.
"""
self.logger.debug(msg)
def info(self, msg: str):
"""
Logs a message with the INFO level.
:param msg: The message to be logged.
"""
self.logger.info(msg)
def warning(self, msg: str):
"""
Logs a message with the WARNING level.
:param msg: The message to be logged.
"""
self.logger.warning(msg)
def error(self, msg: str):
"""
Logs a message with the ERROR level.
:param msg: The message to be logged.
"""
self.logger.error(msg)
def critical(self, msg: str):
"""
Logs a message with the CRITICAL level.
:param msg: The message to be logged.
"""
self.logger.critical(msg)