diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index d684a74b..2b84a2a6 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -3,12 +3,13 @@ from abc import abstractmethod from typing import Callable, Dict, List from uuid import uuid4 -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class SimComponent(BaseModel): """Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator.""" + model_config = ConfigDict(arbitrary_types_allowed=True) uuid: str "The component UUID." diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index c1bed5b0..ce0e7f25 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -4,7 +4,7 @@ import re import secrets from enum import Enum from ipaddress import IPv4Address, IPv4Network -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union from primaite import getLogger from primaite.exceptions import NetworkError @@ -13,6 +13,8 @@ from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader +from primaite.simulator.system.processes.pcap import PCAP +from primaite.simulator.system.processes.sys_log import SysLog _LOGGER = getLogger(__name__) @@ -85,6 +87,7 @@ class NIC(SimComponent): "The Link to which the NIC is connected." enabled: bool = False "Indicates whether the NIC is enabled." + pcap: Optional[PCAP] = None def __init__(self, **kwargs): """ @@ -129,9 +132,10 @@ class NIC(SimComponent): """Attempt to enable the NIC.""" if not self.enabled: if self.connected_node: - if self.connected_node.hardware_state == HardwareState.ON: + if self.connected_node.hardware_state == NodeOperatingState.ON: self.enabled = True _LOGGER.info(f"NIC {self} enabled") + self.pcap = PCAP(hostname=self.connected_node.hostname, ip_address=self.ip_address) if self.connected_link: self.connected_link.endpoint_up() else: @@ -203,6 +207,8 @@ class NIC(SimComponent): :type frame: :class:`~primaite.simulator.network.osi_layers.Frame` """ if self.enabled: + frame.set_sent_timestamp() + self.pcap.capture(frame) self.connected_link.transmit_frame(sender_nic=self, frame=frame) return True else: @@ -219,6 +225,9 @@ class NIC(SimComponent): :type frame: :class:`~primaite.simulator.network.osi_layers.Frame` """ if self.enabled: + frame.decrement_ttl() + frame.set_received_timestamp() + self.pcap.capture(frame) self.connected_node.receive_frame(frame=frame, from_nic=self) return True else: @@ -281,19 +290,18 @@ class Link(SimComponent): super().__init__(**kwargs) self.endpoint_a.connect_link(self) self.endpoint_b.connect_link(self) - if self.up: - _LOGGER.info(f"Link up between {self.endpoint_a} and {self.endpoint_b}") + self.endpoint_up() def endpoint_up(self): """Let the Link know and endpoint has been brought up.""" if self.up: - _LOGGER.info(f"Link up between {self.endpoint_a} and {self.endpoint_b}") + _LOGGER.info(f"Link {self} up") def endpoint_down(self): """Let the Link know and endpoint has been brought down.""" if not self.up: self.current_load = 0.0 - _LOGGER.info(f"Link down between {self.endpoint_a} and {self.endpoint_b}") + _LOGGER.info(f"Link {self} down") @property def up(self) -> bool: @@ -318,20 +326,24 @@ class Link(SimComponent): :param frame: The network frame to be sent. :return: True if the Frame can be sent, otherwise False. """ - receiver_nic = self.endpoint_a - if receiver_nic == sender_nic: - receiver_nic = self.endpoint_b - frame_size = frame.size_Mbits - sent = receiver_nic.receive_frame(frame) - if sent: - # Frame transmitted successfully - # Load the frame size on the link - self.current_load += frame_size - _LOGGER.info(f"Link added {frame_size} Mbits, current load {self.current_load} Mbits") - return True - # Received NIC disabled, reply + if self._can_transmit(frame): + receiver_nic = self.endpoint_a + if receiver_nic == sender_nic: + receiver_nic = self.endpoint_b + frame_size = frame.size_Mbits + sent = receiver_nic.receive_frame(frame) + if sent: + # Frame transmitted successfully + # Load the frame size on the link + self.current_load += frame_size + _LOGGER.info(f"Added {frame_size:.3f} Mbits to {self}, current load {self.current_load:.3f} Mbits") + return True + # Received NIC disabled, reply - return False + return False + else: + _LOGGER.info(f"Cannot transmit frame as {self} is at capacity") + return False def reset_component_for_episode(self): """ @@ -359,15 +371,21 @@ class Link(SimComponent): """ pass + def __str__(self) -> str: + return f"{self.endpoint_a}<-->{self.endpoint_b}" -class HardwareState(Enum): - """Node hardware state enumeration.""" +class NodeOperatingState(Enum): + """Enumeration of Node Operating States.""" + + OFF = 0 + "The node is powered off." ON = 1 - OFF = 2 - RESETTING = 3 - SHUTTING_DOWN = 4 - BOOTING = 5 + "The node is powered on." + SHUTTING_DOWN = 2 + "The node is in the process of shutting down." + BOOTING = 3 + "The node is in the process of booting up." class Node(SimComponent): @@ -380,7 +398,7 @@ class Node(SimComponent): hostname: str "The node hostname on the network." - hardware_state: HardwareState = HardwareState.OFF + operating_state: NodeOperatingState = NodeOperatingState.OFF "The hardware state of the node." nics: Dict[str, NIC] = {} "The NICs on the node." @@ -397,25 +415,30 @@ class Node(SimComponent): "The nodes file system." arp_cache: Dict[IPv4Address, ARPEntry] = {} "The ARP cache." + sys_log: Optional[SysLog] = None revealed_to_red: bool = False "Informs whether the node has been revealed to a red agent." + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.sys_log = SysLog(self.hostname) + def turn_on(self): """Turn on the Node.""" - if self.hardware_state == HardwareState.OFF: - self.hardware_state = HardwareState.ON - _LOGGER.info(f"Node {self.hostname} turned on") + if self.operating_state == NodeOperatingState.OFF: + self.operating_state = NodeOperatingState.ON + self.sys_log.info("Turned on") for nic in self.nics.values(): nic.enable() def turn_off(self): """Turn off the Node.""" - if self.hardware_state == HardwareState.ON: + if self.operating_state == NodeOperatingState.ON: for nic in self.nics.values(): nic.disable() - self.hardware_state = HardwareState.OFF - _LOGGER.info(f"Node {self.hostname} turned off") + self.operating_state = NodeOperatingState.OFF + self.sys_log.info("Turned off") def connect_nic(self, nic: NIC): """ @@ -427,11 +450,12 @@ class Node(SimComponent): if nic.uuid not in self.nics: self.nics[nic.uuid] = nic nic.connected_node = self - _LOGGER.debug(f"Node {self.hostname} connected NIC {nic}") - if self.hardware_state == HardwareState.ON: + self.sys_log.info(f"Connected NIC {nic}") + if self.operating_state == NodeOperatingState.ON: nic.enable() else: - msg = f"Cannot connect NIC {nic} to Node {self.hostname} as it is already connected" + msg = f"Cannot connect NIC {nic} as it is already connected" + self.sys_log.logger.error(msg) _LOGGER.error(msg) raise NetworkError(msg) @@ -447,9 +471,10 @@ class Node(SimComponent): if nic or nic.uuid in self.nics: self.nics.pop(nic.uuid) nic.disable() - _LOGGER.debug(f"Node {self.hostname} disconnected NIC {nic}") + self.sys_log.info(f"Disconnected NIC {nic}") else: - msg = f"Cannot disconnect NIC {nic} from Node {self.hostname} as it is not connected" + msg = f"Cannot disconnect NIC {nic} as it is not connected" + self.sys_log.logger.error(msg) _LOGGER.error(msg) raise NetworkError(msg) @@ -461,7 +486,7 @@ class Node(SimComponent): :param mac_address: The MAC address associated with the IP address. :param nic: The NIC through which the NIC with the IP address is reachable. """ - _LOGGER.info(f"Node {self.hostname} Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}") + self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}") arp_entry = ARPEntry(mac_address=mac_address, nic_uuid=nic.uuid) self.arp_cache[ip_address] = arp_entry @@ -504,7 +529,7 @@ class Node(SimComponent): """Perform a standard ARP request for a given target IP address.""" for nic in self.nics.values(): if nic.enabled: - _LOGGER.info(f"Node {self.hostname} sending ARP request from NIC {nic} for ip {target_ip_address}") + self.sys_log.info(f"Sending ARP request from NIC {nic} for ip {target_ip_address}") tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) # Network Layer @@ -530,35 +555,38 @@ class Node(SimComponent): :param arp_packet:The ARP packet to process. """ if arp_packet.request: - _LOGGER.info( - f"Node {self.hostname} received ARP request from {arp_packet.sender_mac_addr}/{arp_packet.sender_ip}" - ) - self._add_arp_cache_entry( - ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic - ) - arp_packet = arp_packet.generate_reply(from_nic.mac_address) - _LOGGER.info( - f"Node {self.hostname} sending ARP reply from {arp_packet.sender_mac_addr}/{arp_packet.sender_ip} " - f"to {arp_packet.target_ip}/{arp_packet.target_mac_addr} " + self.sys_log.info( + f"Received ARP request for {arp_packet.target_ip} from " + f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip} " ) + if arp_packet.target_ip == from_nic.ip_address: + self._add_arp_cache_entry( + ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic + ) + arp_packet = arp_packet.generate_reply(from_nic.mac_address) + self.sys_log.info( + f"Sending ARP reply from {arp_packet.sender_mac_addr}/{arp_packet.sender_ip} " + f"to {arp_packet.target_ip}/{arp_packet.target_mac_addr} " + ) - tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) + tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) - # Network Layer - ip_packet = IPPacket( - src_ip=arp_packet.sender_ip, - dst_ip=arp_packet.target_ip, - ) - # Data Link Layer - ethernet_header = EthernetHeader( - src_mac_addr=arp_packet.sender_mac_addr, dst_mac_addr=arp_packet.target_mac_addr - ) - frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet) - self.send_frame(frame) + # Network Layer + ip_packet = IPPacket( + src_ip=arp_packet.sender_ip, + dst_ip=arp_packet.target_ip, + ) + # Data Link Layer + ethernet_header = EthernetHeader( + src_mac_addr=arp_packet.sender_mac_addr, dst_mac_addr=arp_packet.target_mac_addr + ) + frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, arp=arp_packet) + self.send_frame(frame) + else: + self.sys_log.info(f"Ignoring ARP request for {arp_packet.target_ip}") else: - _LOGGER.info( - f"Node {self.hostname} received ARP response for {arp_packet.sender_ip} " - f"from {arp_packet.sender_mac_addr} via NIC {from_nic}" + self.sys_log.info( + f"Received ARP response for {arp_packet.sender_ip} from {arp_packet.sender_mac_addr} via NIC {from_nic}" ) self._add_arp_cache_entry( ip_address=arp_packet.sender_ip, mac_address=arp_packet.sender_mac_addr, nic=from_nic @@ -573,7 +601,7 @@ class Node(SimComponent): :param frame: The Frame containing the icmp packet to process. """ if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST: - _LOGGER.info(f"Node {self.hostname} received echo request from {frame.ip.src_ip}") + self.sys_log.info(f"Received echo request from {frame.ip.src_ip}") target_mac_address = self._get_arp_cache_mac_address(frame.ip.src_ip) src_nic = self._get_arp_cache_nic(frame.ip.src_ip) tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP) @@ -589,13 +617,14 @@ class Node(SimComponent): sequence=frame.icmp.sequence + 1, ) frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet) + self.sys_log.info(f"Sending echo reply to {frame.ip.src_ip}") src_nic.send_frame(frame) elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY: - _LOGGER.info(f"Node {self.hostname} received echo reply from {frame.ip.src_ip}") - if frame.icmp.sequence <= 6: # 3 pings - self._ping(frame.ip.src_ip, sequence=frame.icmp.sequence, identifier=frame.icmp.identifier) + self.sys_log.info(f"Received echo reply from {frame.ip.src_ip}") - def _ping(self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None): + def _ping( + self, target_ip_address: IPv4Address, sequence: int = 0, identifier: Optional[int] = None + ) -> Tuple[int, Union[int, None]]: nic = self._get_arp_cache_nic(target_ip_address) if nic: sequence += 1 @@ -613,13 +642,15 @@ class Node(SimComponent): ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address) icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence) frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet) + self.sys_log.info(f"Sending echo request to {target_ip_address}") nic.send_frame(frame) + return sequence, icmp_packet.identifier else: - _LOGGER.info(f"Node {self.hostname} no entry in ARP cache for {target_ip_address}") + self.sys_log.info(f"No entry in ARP cache for {target_ip_address}") self._send_arp_request(target_ip_address) - self._ping(target_ip_address=target_ip_address) + return 0, None - def ping(self, target_ip_address: Union[IPv4Address, str]) -> bool: + def ping(self, target_ip_address: Union[IPv4Address, str], pings: int = 4) -> bool: """ Ping an IP address. @@ -630,11 +661,13 @@ class Node(SimComponent): """ if not isinstance(target_ip_address, IPv4Address): target_ip_address = IPv4Address(target_ip_address) - if self.hardware_state == HardwareState.ON: - _LOGGER.info(f"Node {self.hostname} attempting to ping {target_ip_address}") - self._ping(target_ip_address) + if self.operating_state == NodeOperatingState.ON: + self.sys_log.info(f"Attempting to ping {target_ip_address}") + sequence, identifier = 0, None + while sequence < pings: + sequence, identifier = self._ping(target_ip_address, sequence, identifier) return True - _LOGGER.info(f"Node {self.hostname} ping failed as the node is turned off") + self.sys_log.info("Ping failed as the node is turned off") return False def send_frame(self, frame: Frame): diff --git a/src/primaite/simulator/network/transmission/data_link_layer.py b/src/primaite/simulator/network/transmission/data_link_layer.py index 97a1a423..1b7ccf7d 100644 --- a/src/primaite/simulator/network/transmission/data_link_layer.py +++ b/src/primaite/simulator/network/transmission/data_link_layer.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Any, Optional from pydantic import BaseModel @@ -99,6 +100,29 @@ class Frame(BaseModel): "PrimAITE header." payload: Optional[Any] = None "Raw data payload." + sent_timestamp: Optional[datetime] = None + "The time the Frame was sent from the original source NIC." + received_timestamp: Optional[datetime] = None + "The time the Frame was received at the final destination NIC." + + def decrement_ttl(self): + """Decrement the IPPacket ttl by 1.""" + self.ip.ttl -= 1 + + @property + def can_transmit(self) -> bool: + """Informs whether the Frame can transmit based on the IPPacket tll being >= 1.""" + return self.ip.ttl >= 1 + + def set_sent_timestamp(self): + """Set the sent_timestamp.""" + if not self.sent_timestamp: + self.sent_timestamp = datetime.now() + + def set_received_timestamp(self): + """Set the received_timestamp.""" + if not self.received_timestamp: + self.received_timestamp = datetime.now() @property def size(self) -> float: # noqa - Keep it as MBits as this is how they're expressed diff --git a/src/primaite/simulator/system/__init__.py b/src/primaite/simulator/system/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/primaite/simulator/system/processes/__init__.py b/src/primaite/simulator/system/processes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/primaite/simulator/system/processes/pcap.py b/src/primaite/simulator/system/processes/pcap.py new file mode 100644 index 00000000..c502adc8 --- /dev/null +++ b/src/primaite/simulator/system/processes/pcap.py @@ -0,0 +1,61 @@ +import logging +from pathlib import Path + + +class _JSONFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + """Filter logs that start and end with '{' and '}' (JSON-like messages).""" + return record.getMessage().startswith("{") and record.getMessage().endswith("}") + + +class PCAP: + """ + A logger class for logging Frames as json strings. + + This is essentially a PrimAITE simulated version of PCAP. + + The PCAPs are logged to: //__pcap.log + """ + + def __init__(self, hostname: str, ip_address: str): + """ + Initialize the PCAP instance. + + :param hostname: The hostname for which PCAP logs are being recorded. + :param ip_address: The IP address associated with the PCAP logs. + """ + self.hostname = hostname + self.ip_address = str(ip_address) + self._setup_logger() + + def _setup_logger(self): + """Set up the logger configuration.""" + log_path = self._get_log_path() + + file_handler = logging.FileHandler(filename=log_path) + file_handler.setLevel(60) # Custom log level > CRITICAL to prevent any unwanted standard DEBUG-CRITICAL logs + + log_format = "%(message)s" + file_handler.setFormatter(logging.Formatter(log_format)) + + logger_name = f"{self.hostname}_{self.ip_address}_pcap" + self.logger = logging.getLogger(logger_name) + self.logger.setLevel(60) # Custom log level > CRITICAL to prevent any unwanted standard DEBUG-CRITICAL logs + self.logger.addHandler(file_handler) + + self.logger.addFilter(_JSONFilter()) + + def _get_log_path(self) -> Path: + """Get the path for the log file.""" + root = Path(__file__).parent.parent.parent.parent.parent.parent / "simulation_output" / self.hostname + root.mkdir(exist_ok=True, parents=True) + return root / f"{self.hostname}_{self.ip_address}_pcap.log" + + def capture(self, frame): # noqa Please don't make me, I'll have a circular import and cant use if TYPE_CHECKING ;( + """ + Capture a Frame and log it. + + :param frame: The PCAP frame to capture. + """ + msg = frame.model_dump_json() + self.logger.log(level=60, msg=msg) # Log at custom log level > CRITICAL diff --git a/src/primaite/simulator/system/processes/sys_log.py b/src/primaite/simulator/system/processes/sys_log.py new file mode 100644 index 00000000..27b35505 --- /dev/null +++ b/src/primaite/simulator/system/processes/sys_log.py @@ -0,0 +1,87 @@ +import logging +from pathlib import Path + + +class _NotJSONFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + """Filter logs that do not start and end with '{' and '}'.""" + return not record.getMessage().startswith("{") and not record.getMessage().endswith("}") + + +class SysLog: + """ + A simple logger class for writing the sys logs of a Node. + + Logs are logged to: //_sys.log + """ + + def __init__(self, hostname: str): + """ + Initialize the SysLog instance. + + :param hostname: The hostname for which logs are being recorded. + """ + self.hostname = hostname + self._setup_logger() + + def _setup_logger(self): + """Set up the logger configuration.""" + log_path = self._get_log_path() + + file_handler = logging.FileHandler(filename=log_path) + file_handler.setLevel(logging.DEBUG) + + log_format = "%(asctime)s %(levelname)s: %(message)s" + file_handler.setFormatter(logging.Formatter(log_format)) + + self.logger = logging.getLogger(f"{self.hostname}_sys_log") + self.logger.setLevel(logging.DEBUG) + self.logger.addHandler(file_handler) + + self.logger.addFilter(_NotJSONFilter()) + + def _get_log_path(self) -> Path: + """Get the path for the log file.""" + root = Path(__file__).parent.parent.parent.parent.parent.parent / "simulation_output" / self.hostname + root.mkdir(exist_ok=True, parents=True) + return root / f"{self.hostname}_sys.log" + + def debug(self, msg: str): + """ + Log a debug message. + + :param msg: The message to log. + """ + self.logger.debug(msg) + + def info(self, msg: str): + """ + Log an info message. + + :param msg: The message to log. + """ + self.logger.info(msg) + + def warning(self, msg: str): + """ + Log a warning message. + + :param msg: The message to log. + """ + self.logger.warning(msg) + + def error(self, msg: str): + """ + Log an error message. + + :param msg: The message to log. + """ + self.logger.error(msg) + + def critical(self, msg: str): + """ + Log a critical message. + + :param msg: The message to log. + """ + self.logger.critical(msg) diff --git a/src/primaite/simulator/system/services/__init__.py b/src/primaite/simulator/system/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/primaite/simulator/system/services/icmp.py b/src/primaite/simulator/system/services/icmp.py new file mode 100644 index 00000000..e69de29b diff --git a/src/primaite/simulator/system/software.py b/src/primaite/simulator/system/software.py new file mode 100644 index 00000000..a5d0bd18 --- /dev/null +++ b/src/primaite/simulator/system/software.py @@ -0,0 +1,94 @@ +from enum import Enum + +from primaite.simulator.core import SimComponent + + +class SoftwareHealthState(Enum): + """Enumeration of the Software Health States.""" + + GOOD = 1 + "The software is in a good and healthy condition." + COMPROMISED = 2 + "The software's security has been compromised." + OVERWHELMED = 3 + "he software is overwhelmed and not functioning properly." + PATCHING = 4 + "The software is undergoing patching or updates." + + +class ApplicationOperatingState(Enum): + """Enumeration of Application Operating States.""" + + CLOSED = 0 + "The application is closed or not running." + RUNNING = 1 + "The application is running." + INSTALLING = 3 + "The application is being installed or updated." + + +class ServiceOperatingState(Enum): + """Enumeration of Service Operating States.""" + + STOPPED = 0 + "The service is not running." + RUNNING = 1 + "The service is currently running." + RESTARTING = 2 + "The service is in the process of restarting." + INSTALLING = 3 + "The service is being installed or updated." + PAUSED = 4 + "The service is temporarily paused." + DISABLED = 5 + "The service is disabled and cannot be started." + + +class ProcessOperatingState(Enum): + """Enumeration of Process Operating States.""" + + RUNNING = 1 + "The process is running." + PAUSED = 2 + "The process is temporarily paused." + + +class SoftwareCriticality(Enum): + """Enumeration of Software Criticality Levels.""" + + LOWEST = 1 + "The lowest level of criticality." + LOW = 2 + "A low level of criticality." + MEDIUM = 3 + "A medium level of criticality." + HIGH = 4 + "A high level of criticality." + HIGHEST = 5 + "The highest level of criticality." + + +class Software(SimComponent): + """ + Represents software information along with its health, criticality, and status. + + This class inherits from the Pydantic BaseModel and provides a structured way to store + information about software entities. + + Attributes: + name (str): The name of the software. + health_state_actual (SoftwareHealthState): The actual health state of the software. + health_state_visible (SoftwareHealthState): The health state of the software visible to users. + criticality (SoftwareCriticality): The criticality level of the software. + patching_count (int, optional): The count of patches applied to the software. Default is 0. + scanning_count (int, optional): The count of times the software has been scanned. Default is 0. + revealed_to_red (bool, optional): Indicates if the software has been revealed to red team. Default is False. + """ + + name: str + health_state_actual: SoftwareHealthState + health_state_visible: SoftwareHealthState + criticality: SoftwareCriticality + patching_count: int = 0 + scanning_count: int = 0 + revealed_to_red: bool = False diff --git a/tests/integration_tests/network/test_frame_transmission.py b/tests/integration_tests/network/test_frame_transmission.py index 32abd0ef..82e97049 100644 --- a/tests/integration_tests/network/test_frame_transmission.py +++ b/tests/integration_tests/network/test_frame_transmission.py @@ -16,10 +16,29 @@ def test_node_to_node_ping(): assert node_a.ping("192.168.0.11") - node_a.turn_off() - - assert not node_a.ping("192.168.0.11") +def test_multi_nic(): + node_a = Node(hostname="node_a") + nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1") + node_a.connect_nic(nic_a) node_a.turn_on() - assert node_a.ping("192.168.0.11") + node_b = Node(hostname="node_b") + nic_b1 = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1") + nic_b2 = NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0", gateway="10.0.0.1") + node_b.connect_nic(nic_b1) + node_b.connect_nic(nic_b2) + node_b.turn_on() + + node_c = Node(hostname="node_c") + nic_c = NIC(ip_address="10.0.0.13", subnet_mask="255.0.0.0", gateway="10.0.0.1") + node_c.connect_nic(nic_c) + node_c.turn_on() + + link_a_b1 = Link(endpoint_a=nic_a, endpoint_b=nic_b1) + + link_b2_c = Link(endpoint_a=nic_b2, endpoint_b=nic_c) + + node_a.ping("192.168.0.11") + + node_c.ping("10.0.0.12")