#1706 - Got the code services, application, and process base classes stubbed out. Need them now so that I can leverage them for core node services required.

This commit is contained in:
Chris McCarthy
2023-08-03 21:30:13 +01:00
parent cac4779244
commit 04f1cb0dc6
11 changed files with 378 additions and 82 deletions

View File

@@ -64,7 +64,7 @@ class SimComponent(BaseModel):
"""
pass
def reset_component_for_episode(self):
def reset_component_for_episode(self, episode: int):
"""
Reset this component to its original state for a new episode.

View File

@@ -13,8 +13,8 @@ from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
from primaite.simulator.system.processes.pcap import PCAP
from primaite.simulator.system.processes.sys_log import SysLog
from primaite.simulator.system.packet_capture import PacketCapture
from primaite.simulator.system.sys_log import SysLog
_LOGGER = getLogger(__name__)
@@ -87,7 +87,7 @@ class NIC(SimComponent):
"The Link to which the NIC is connected."
enabled: bool = False
"Indicates whether the NIC is enabled."
pcap: Optional[PCAP] = None
pcap: Optional[PacketCapture] = None
def __init__(self, **kwargs):
"""
@@ -132,10 +132,10 @@ class NIC(SimComponent):
"""Attempt to enable the NIC."""
if not self.enabled:
if self.connected_node:
if self.connected_node.hardware_state == NodeOperatingState.ON:
if self.connected_node.operating_state == NodeOperatingState.ON:
self.enabled = True
_LOGGER.info(f"NIC {self} enabled")
self.pcap = PCAP(hostname=self.connected_node.hostname, ip_address=self.ip_address)
self.pcap = PacketCapture(hostname=self.connected_node.hostname, ip_address=self.ip_address)
if self.connected_link:
self.connected_link.endpoint_up()
else:
@@ -393,7 +393,7 @@ class Node(SimComponent):
A basic Node class.
:param hostname: The node hostname on the network.
:param hardware_state: The hardware state of the node.
:param operating_state: The node operating state.
"""
hostname: str

View File

@@ -0,0 +1,86 @@
from abc import abstractmethod
from enum import Enum
from typing import Any, List, Dict, Set
from primaite.simulator.system.software import IOSoftware
class ApplicationOperatingState(Enum):
"""Enumeration of Application Operating States."""
CLOSED = 0
"The application is closed or not running."
RUNNING = 1
"The application is running."
INSTALLING = 3
"The application is being installed or updated."
class Application(IOSoftware):
"""
Represents an Application in the simulation environment.
Applications are user-facing programs that may perform input/output operations.
"""
operating_state: ApplicationOperatingState
"The current operating state of the Application."
execution_control_status: str
"Control status of the application's execution. It could be 'manual' or 'automatic'."
num_executions: int = 0
"The number of times the application has been executed. Default is 0."
groups: Set[str] = set()
"The set of groups to which the application belongs."
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current state of the software.
:rtype: Dict
"""
pass
def apply_action(self, action: List[str]) -> None:
"""
Applies a list of actions to the Application.
:param action: A list of actions to apply.
"""
pass
def reset_component_for_episode(self, episode: int):
"""
Resets the Application component for a new episode.
This method ensures the Application is ready for a new episode, including resetting any
stateful properties or statistics, and clearing any message queues.
"""
pass
def send(self, payload: Any) -> bool:
"""
Sends a payload to the SessionManager
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to send.
:return: True if successful, False otherwise.
"""
pass
def receive(self, payload: Any) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to receive.
:return: True if successful, False otherwise.
"""
pass

View File

@@ -0,0 +1,30 @@
from ipaddress import IPv4Address
from pydantic import BaseModel
class ARPCacheService(BaseModel):
def __init__(self, node):
super().__init__()
self.node = node
def _add_arp_cache_entry(self, ip_address: IPv4Address, mac_address: str, nic: NIC):
...
def _remove_arp_cache_entry(self, ip_address: IPv4Address):
...
def _get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
...
def _get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
...
def _clear_arp_cache(self):
...
def _send_arp_request(self, target_ip_address: Union[IPv4Address, str]):
...
def process_arp_packet(self, from_nic: NIC, arp_packet: ARPPacket):
...

View File

@@ -8,24 +8,26 @@ class _JSONFilter(logging.Filter):
return record.getMessage().startswith("{") and record.getMessage().endswith("}")
class PCAP:
class PacketCapture:
"""
A logger class for logging Frames as json strings.
Represents a PacketCapture component on a Node in the simulation environment.
This is essentially a PrimAITE simulated version of PCAP.
PacketCapture is a service that logs Frames as json strings; It's Wireshark for PrimAITE.
The PCAPs are logged to: <simulation output directory>/<hostname>/<hostname>_<ip address>_pcap.log
"""
def __init__(self, hostname: str, ip_address: str):
"""
Initialize the PCAP instance.
Initialize the PacketCapture process.
:param hostname: The hostname for which PCAP logs are being recorded.
:param ip_address: The IP address associated with the PCAP logs.
"""
self.hostname = hostname
self.ip_address = str(ip_address)
self.hostname: str = hostname
"The hostname for which PCAP logs are being recorded."
self.ip_address: str = ip_address
"The IP address associated with the PCAP logs."
self._setup_logger()
def _setup_logger(self):
@@ -51,7 +53,7 @@ class PCAP:
root.mkdir(exist_ok=True, parents=True)
return root / f"{self.hostname}_{self.ip_address}_pcap.log"
def capture(self, frame): # noqa Please don't make me, I'll have a circular import and cant use if TYPE_CHECKING ;(
def capture(self, frame): # noqa - I'll have a circular import and cant use if TYPE_CHECKING ;(
"""
Capture a Frame and log it.

View File

@@ -0,0 +1,37 @@
from abc import abstractmethod
from enum import Enum
from typing import List, Dict, Any
from primaite.simulator.system.software import Software
class ProcessOperatingState(Enum):
"""Enumeration of Process Operating States."""
RUNNING = 1
"The process is running."
PAUSED = 2
"The process is temporarily paused."
class Process(Software):
"""
Represents a Process, a program in execution, in the simulation environment.
Processes are executed by a Node and do not have the ability to performing input/output operations.
"""
operating_state: ProcessOperatingState
"The current operating state of the Process."
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current state of the software.
:rtype: Dict
"""
pass

View File

@@ -0,0 +1,87 @@
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, List
from primaite.simulator.system.software import IOSoftware
class ServiceOperatingState(Enum):
"""Enumeration of Service Operating States."""
STOPPED = 0
"The service is not running."
RUNNING = 1
"The service is currently running."
RESTARTING = 2
"The service is in the process of restarting."
INSTALLING = 3
"The service is being installed or updated."
PAUSED = 4
"The service is temporarily paused."
DISABLED = 5
"The service is disabled and cannot be started."
class Service(IOSoftware):
"""
Represents a Service in the simulation environment.
Services are programs that run in the background and may perform input/output operations.
"""
operating_state: ServiceOperatingState
"The current operating state of the Service."
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current state of the software.
:rtype: Dict
"""
pass
def apply_action(self, action: List[str]) -> None:
"""
Applies a list of actions to the Service.
:param action: A list of actions to apply.
"""
pass
def reset_component_for_episode(self, episode: int):
"""
Resets the Service component for a new episode.
This method ensures the Service is ready for a new episode, including resetting any
stateful properties or statistics, and clearing any message queues.
"""
pass
def send(self, payload: Any) -> bool:
"""
Sends a payload to the SessionManager
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to send.
:return: True if successful, False otherwise.
"""
pass
def receive(self, payload: Any) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to receive.
:return: True if successful, False otherwise.
"""
pass

View File

@@ -1,6 +1,9 @@
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, List, Set
from primaite.simulator.core import SimComponent
from primaite.simulator.network.transmission.transport_layer import Port
class SoftwareHealthState(Enum):
@@ -16,43 +19,6 @@ class SoftwareHealthState(Enum):
"The software is undergoing patching or updates."
class ApplicationOperatingState(Enum):
"""Enumeration of Application Operating States."""
CLOSED = 0
"The application is closed or not running."
RUNNING = 1
"The application is running."
INSTALLING = 3
"The application is being installed or updated."
class ServiceOperatingState(Enum):
"""Enumeration of Service Operating States."""
STOPPED = 0
"The service is not running."
RUNNING = 1
"The service is currently running."
RESTARTING = 2
"The service is in the process of restarting."
INSTALLING = 3
"The service is being installed or updated."
PAUSED = 4
"The service is temporarily paused."
DISABLED = 5
"The service is disabled and cannot be started."
class ProcessOperatingState(Enum):
"""Enumeration of Process Operating States."""
RUNNING = 1
"The process is running."
PAUSED = 2
"The process is temporarily paused."
class SoftwareCriticality(Enum):
"""Enumeration of Software Criticality Levels."""
@@ -70,25 +36,101 @@ class SoftwareCriticality(Enum):
class Software(SimComponent):
"""
Represents software information along with its health, criticality, and status.
A base class representing software in a simulator environment.
This class inherits from the Pydantic BaseModel and provides a structured way to store
information about software entities.
Attributes:
name (str): The name of the software.
health_state_actual (SoftwareHealthState): The actual health state of the software.
health_state_visible (SoftwareHealthState): The health state of the software visible to users.
criticality (SoftwareCriticality): The criticality level of the software.
patching_count (int, optional): The count of patches applied to the software. Default is 0.
scanning_count (int, optional): The count of times the software has been scanned. Default is 0.
revealed_to_red (bool, optional): Indicates if the software has been revealed to red team. Default is False.
This class is intended to be subclassed by specific types of software entities.
It outlines the fundamental attributes and behaviors expected of any software in the simulation.
"""
name: str
"The name of the software."
health_state_actual: SoftwareHealthState
"The actual health state of the software."
health_state_visible: SoftwareHealthState
"The health state of the software visible to the red agent."
criticality: SoftwareCriticality
"The criticality level of the software."
patching_count: int = 0
"The count of patches applied to the software, defaults to 0."
scanning_count: int = 0
"The count of times the software has been scanned, defaults to 0."
revealed_to_red: bool = False
"Indicates if the software has been revealed to red agent, defaults is False."
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current state of the software.
:rtype: Dict
"""
pass
def apply_action(self, action: List[str]) -> None:
"""
Applies a list of actions to the software.
The specifics of how these actions are applied should be implemented in subclasses.
:param action: A list of actions to apply.
:type action: List[str]
"""
pass
def reset_component_for_episode(self, episode: int):
"""
Resets the software component for a new episode.
This method should ensure the software is ready for a new episode, including resetting any
stateful properties or statistics, and clearing any message queues. The specifics of what constitutes a
"reset" should be implemented in subclasses.
"""
pass
class IOSoftware(Software):
"""
Represents software in a simulator environment that is capable of input/output operations.
This base class is meant to be sub-classed by Application and Service classes. It provides the blueprint for
Applications and Services that can receive payloads from a Node's SessionManager (corresponding to layer 5 in the
OSI Model), process them according to their internals, and send a response payload back to the SessionManager if
required.
"""
installing_count: int = 0
"The number of times the software has been installed. Default is 0."
max_sessions: int = 1
"The maximum number of sessions that the software can handle simultaneously. Default is 0."
tcp: bool = True
"Indicates if the software uses TCP protocol for communication. Default is True."
udp: bool = True
"Indicates if the software uses UDP protocol for communication. Default is True."
ports: Set[Port]
"The set of ports to which the software is connected."
def send(self, payload: Any) -> bool:
"""
Sends a payload to the SessionManager
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to send.
:return: True if successful, False otherwise.
"""
pass
def receive(self, payload: Any) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to receive.
:return: True if successful, False otherwise.
"""
pass

View File

@@ -4,28 +4,36 @@ from pathlib import Path
class _NotJSONFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
"""Filter logs that do not start and end with '{' and '}'."""
"""
Determines if a log message does not start and end with '{' and '}' (i.e., it is not a JSON-like message).
:param record: LogRecord object containing all the information pertinent to the event being logged.
:return: True if log message is not JSON-like, False otherwise.
"""
return not record.getMessage().startswith("{") and not record.getMessage().endswith("}")
class SysLog:
"""
A simple logger class for writing the sys logs of a Node.
A SysLog class is a simple logger dedicated to managing and writing system logs for a Node.
Logs are logged to: <simulation output directory>/<hostname>/<hostname>_sys.log
Each log message is written to a file located at: <simulation output directory>/<hostname>/<hostname>_sys.log
"""
def __init__(self, hostname: str):
"""
Initialize the SysLog instance.
Constructs a SysLog instance for a given hostname.
:param hostname: The hostname for which logs are being recorded.
:param hostname: The hostname associated with the system logs being recorded.
"""
self.hostname = hostname
self._setup_logger()
def _setup_logger(self):
"""Set up the logger configuration."""
"""
Configures the logger for this SysLog instance. The logger is set to the DEBUG level,
and is equipped with a handler that writes to a file and filters out JSON-like messages.
"""
log_path = self._get_log_path()
file_handler = logging.FileHandler(filename=log_path)
@@ -41,47 +49,51 @@ class SysLog:
self.logger.addFilter(_NotJSONFilter())
def _get_log_path(self) -> Path:
"""Get the path for the log file."""
"""
Constructs the path for the log file based on the hostname.
:return: Path object representing the location of the log file.
"""
root = Path(__file__).parent.parent.parent.parent.parent.parent / "simulation_output" / self.hostname
root.mkdir(exist_ok=True, parents=True)
return root / f"{self.hostname}_sys.log"
def debug(self, msg: str):
"""
Log a debug message.
Logs a message with the DEBUG level.
:param msg: The message to log.
:param msg: The message to be logged.
"""
self.logger.debug(msg)
def info(self, msg: str):
"""
Log an info message.
Logs a message with the INFO level.
:param msg: The message to log.
:param msg: The message to be logged.
"""
self.logger.info(msg)
def warning(self, msg: str):
"""
Log a warning message.
Logs a message with the WARNING level.
:param msg: The message to log.
:param msg: The message to be logged.
"""
self.logger.warning(msg)
def error(self, msg: str):
"""
Log an error message.
Logs a message with the ERROR level.
:param msg: The message to log.
:param msg: The message to be logged.
"""
self.logger.error(msg)
def critical(self, msg: str):
"""
Log a critical message.
Logs a message with the CRITICAL level.
:param msg: The message to log.
:param msg: The message to be logged.
"""
self.logger.critical(msg)

View File

@@ -41,4 +41,4 @@ def test_multi_nic():
node_a.ping("192.168.0.11")
node_c.ping("10.0.0.12")
node_c.ping("10.0.0.12")