diff --git a/docs/source/simulation_components/network/network_interfaces.rst b/docs/source/simulation_components/network/network_interfaces.rst index c74b54ae..2bb8dda4 100644 --- a/docs/source/simulation_components/network/network_interfaces.rst +++ b/docs/source/simulation_components/network/network_interfaces.rst @@ -71,7 +71,7 @@ Network Interface Classes - Malicious Network Events Monitoring: * Enhances network interfaces with the capability to monitor and capture Malicious Network Events (MNEs) based on predefined criteria such as specific keywords or traffic patterns. - * Integrates NMNE detection functionalities, leveraging configurable settings like ``capture_nmne``, `nmne_capture_keywords``, and observation mechanisms such as ``NicObservation`` to classify and record network anomalies. + * Integrates Number of Malicious Network Events (NMNE) detection functionalities, leveraging configurable settings like ``capture_nmne``, `nmne_capture_keywords``, and observation mechanisms such as ``NicObservation`` to classify and record network anomalies. * Offers an additional layer of security and data analysis, crucial for identifying and mitigating malicious activities within the network infrastructure. Provides vital information for network security analysis and reinforcement learning algorithms. **WiredNetworkInterface (Connection Type Layer)** diff --git a/src/primaite/game/agent/observations.py b/src/primaite/game/agent/observations.py index 1d8799fd..7ccc3f11 100644 --- a/src/primaite/game/agent/observations.py +++ b/src/primaite/game/agent/observations.py @@ -352,8 +352,6 @@ class NicObservation(AbstractObservation): """The default NIC observation dict.""" data = {"nic_status": 0} - if CAPTURE_NMNE: - data.update({"nmne": {"inbound": 0, "outbound": 0}}) return data def __init__(self, where: Optional[Tuple[str]] = None) -> None: @@ -407,7 +405,7 @@ class NicObservation(AbstractObservation): return self.default_observation else: obs_dict = {"nic_status": 1 if nic_state["enabled"] else 2, "nmne": {}} - if CAPTURE_NMNE: + if CAPTURE_NMNE and nic_state.get("nmne"): direction_dict = nic_state["nmne"].get("direction", {}) inbound_keywords = direction_dict.get("inbound", {}).get("keywords", {}) inbound_count = inbound_keywords.get("*", 0) diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index c0e69e60..b22bea25 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -98,6 +98,7 @@ class NetworkInterface(SimComponent, ABC): "A PacketCapture instance for capturing and analysing packets passing through this interface." nmne: Dict = Field(default_factory=lambda: {}) + "A dict containing details of the number of malicious network events captured." def _init_request_manager(self) -> RequestManager: rm = super()._init_request_manager() @@ -122,7 +123,6 @@ class NetworkInterface(SimComponent, ABC): "enabled": self.enabled, } ) - state.update({"nmne": self.nmne}) return state def reset_component_for_episode(self, episode: int): @@ -134,23 +134,29 @@ class NetworkInterface(SimComponent, ABC): self.pcap.setup_logger() self.enable() - # @abstractmethod + @abstractmethod def enable(self): """Enable the interface.""" pass - # @abstractmethod + @abstractmethod def disable(self): """Disable the interface.""" pass - def _capture_nmne(self, frame: Frame, inbound: bool = True): + def _capture_nmne(self, frame: Frame, inbound: bool = True) -> None: """ Processes and captures network frame data based on predefined global NMNE settings. This method updates the NMNE structure with counts of malicious network events based on the frame content and direction. The structure is dynamically adjusted according to the enabled capture settings. + .. note:: + While there is a lot of logic in this code that defines a multi-level hierarchical NMNE structure, + most of it is unused for now as a result of all `CAPTURE_BY_<>` variables in + ``primaite.simulator.network.nmne`` being hardcoded and set as final. Once they're 'released' and made + configurable, this function will be updated to properly explain the dynamic data structure. + :param frame: The network frame to process, containing IP, TCP/UDP, and payload information. :param inbound: Boolean indicating if the frame direction is inbound. Defaults to True. """ @@ -214,7 +220,7 @@ class NetworkInterface(SimComponent, ABC): # Increment a generic counter if keyword capturing is not enabled keyword_level["*"] = keyword_level.get("*", 0) + 1 - # @abstractmethod + @abstractmethod def send_frame(self, frame: Frame) -> bool: """ Attempts to send a network frame through the interface. @@ -224,7 +230,7 @@ class NetworkInterface(SimComponent, ABC): """ self._capture_nmne(frame, inbound=False) - # @abstractmethod + @abstractmethod def receive_frame(self, frame: Frame) -> bool: """ Receives a network frame on the interface. diff --git a/src/primaite/simulator/network/hardware/nodes/host/host_node.py b/src/primaite/simulator/network/hardware/nodes/host/host_node.py index 6ecd6733..8e104924 100644 --- a/src/primaite/simulator/network/hardware/nodes/host/host_node.py +++ b/src/primaite/simulator/network/hardware/nodes/host/host_node.py @@ -205,11 +205,7 @@ class NIC(IPWiredNetworkInterface): state = super().describe_state() # Update the state with NIC-specific information - state.update( - { - "wake_on_lan": self.wake_on_lan, - } - ) + state.update({"wake_on_lan": self.wake_on_lan, "nmne": self.nmne}) return state diff --git a/src/primaite/simulator/network/nmne.py b/src/primaite/simulator/network/nmne.py index d4c40631..87839712 100644 --- a/src/primaite/simulator/network/nmne.py +++ b/src/primaite/simulator/network/nmne.py @@ -6,6 +6,7 @@ CAPTURE_NMNE: bool = True NMNE_CAPTURE_KEYWORDS: List[str] = [] """List of keywords to identify malicious network events.""" +# TODO: Remove final and make configurable after example layout when the NicObservation creates nmne structure dynamically CAPTURE_BY_DIRECTION: Final[bool] = True """Flag to determine if captures should be organized by traffic direction (inbound/outbound).""" CAPTURE_BY_IP_ADDRESS: Final[bool] = False