From a900d59f7b24124739dcb8be02a241220319d0b1 Mon Sep 17 00:00:00 2001 From: Marek Wolan Date: Thu, 7 Mar 2024 12:15:30 +0000 Subject: [PATCH 1/2] Update NMNE to only count MNEs in the last step. --- .../simulator/network/hardware/base.py | 3 +- .../network/test_capture_nmne.py | 100 +++++++++++++++--- 2 files changed, 88 insertions(+), 15 deletions(-) diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 991913dd..36716f27 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -136,7 +136,8 @@ class NetworkInterface(SimComponent, ABC): } ) if CAPTURE_NMNE: - state.update({"nmne": self.nmne}) + state.update({"nmne": {k: v for k, v in self.nmne.items()}}) + self.nmne.clear() return state @abstractmethod diff --git a/tests/integration_tests/network/test_capture_nmne.py b/tests/integration_tests/network/test_capture_nmne.py index 85ac23e8..d48b3784 100644 --- a/tests/integration_tests/network/test_capture_nmne.py +++ b/tests/integration_tests/network/test_capture_nmne.py @@ -32,36 +32,106 @@ def test_capture_nmne(uc2_network): set_nmne_config(nmne_config) # Assert that initially, there are no captured MNEs on both web and database servers - assert web_server_nic.describe_state()["nmne"] == {} - assert db_server_nic.describe_state()["nmne"] == {} + assert web_server_nic.nmne == {} + assert db_server_nic.nmne == {} # Perform a "SELECT" query db_client.query("SELECT") # Check that it does not trigger an MNE capture. - assert web_server_nic.describe_state()["nmne"] == {} - assert db_server_nic.describe_state()["nmne"] == {} + assert web_server_nic.nmne == {} + assert db_server_nic.nmne == {} # Perform a "DELETE" query db_client.query("DELETE") # Check that the web server's outbound interface and the database server's inbound interface register the MNE - assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}} - assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}} + assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 1}}}} + assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 1}}}} # Perform another "SELECT" query db_client.query("SELECT") # Check that no additional MNEs are captured - assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}} - assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}} + assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 1}}}} + assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 1}}}} # Perform another "DELETE" query db_client.query("DELETE") # Check that the web server and database server interfaces register an additional MNE - assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 2}}}} - assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 2}}}} + assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 2}}}} + assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 2}}}} + + +def test_describe_state_nmne(uc2_network): + """ + Conducts a test to verify that Malicious Network Events (MNEs) are correctly represented in the nic state. + + This test involves a web server querying a database server and checks if the MNEs are captured + based on predefined keywords in the network configuration. Specifically, it checks the capture + of the "DELETE" SQL command as a malicious network event. It also checks that running describe_state + only shows MNEs since the last time describe_state was called. + """ + web_server: Server = uc2_network.get_node_by_hostname("web_server") # noqa + db_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"] # noqa + db_client.connect() + + db_server: Server = uc2_network.get_node_by_hostname("database_server") # noqa + + web_server_nic = web_server.network_interface[1] + db_server_nic = db_server.network_interface[1] + + # Set the NMNE configuration to capture DELETE queries as MNEs + nmne_config = { + "capture_nmne": True, # Enable the capture of MNEs + "nmne_capture_keywords": ["DELETE"], # Specify "DELETE" SQL command as a keyword for MNE detection + } + + # Apply the NMNE configuration settings + set_nmne_config(nmne_config) + + # Assert that initially, there are no captured MNEs on both web and database servers + web_server_nic_state = web_server_nic.describe_state() + db_server_nic_state = db_server_nic.describe_state() + assert web_server_nic_state["nmne"] == {} + assert db_server_nic_state["nmne"] == {} + + # Perform a "SELECT" query + db_client.query("SELECT") + + # Check that it does not trigger an MNE capture. + web_server_nic_state = web_server_nic.describe_state() + db_server_nic_state = db_server_nic.describe_state() + assert web_server_nic_state["nmne"] == {} + assert db_server_nic_state["nmne"] == {} + + # Perform a "DELETE" query + db_client.query("DELETE") + + # Check that the web server's outbound interface and the database server's inbound interface register the MNE + web_server_nic_state = web_server_nic.describe_state() + db_server_nic_state = db_server_nic.describe_state() + assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}} + assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}} + + # Perform another "SELECT" query + db_client.query("SELECT") + + # Check that no additional MNEs are captured + web_server_nic_state = web_server_nic.describe_state() + db_server_nic_state = db_server_nic.describe_state() + assert web_server_nic_state["nmne"] == {} + assert db_server_nic_state["nmne"] == {} + + # Perform another "DELETE" query + db_client.query("DELETE") + + # Check that the web server and database server interfaces register an additional MNE + web_server_nic_state = web_server_nic.describe_state() + db_server_nic_state = db_server_nic.describe_state() + assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}} + assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}} def test_capture_nmne_observations(uc2_network): @@ -97,13 +167,15 @@ def test_capture_nmne_observations(uc2_network): web_server_nic_obs = NicObservation(where=["network", "nodes", "web_server", "NICs", 1]) # Iterate through a set of test cases to simulate multiple DELETE queries - for i in range(1, 20): + for i in range(0, 20): # Perform a "DELETE" query each iteration - db_client.query("DELETE") + for j in range(i): + db_client.query("DELETE") # Observe the current state of NMNEs from the NICs of both the database and web servers - db_nic_obs = db_server_nic_obs.observe(sim.describe_state())["nmne"] - web_nic_obs = web_server_nic_obs.observe(sim.describe_state())["nmne"] + state = sim.describe_state() + db_nic_obs = db_server_nic_obs.observe(state)["nmne"] + web_nic_obs = web_server_nic_obs.observe(state)["nmne"] # Define expected NMNE values based on the iteration count if i > 10: From 76752fd9af1d7a2d5b37247963e4d35873c10755 Mon Sep 17 00:00:00 2001 From: Marek Wolan Date: Thu, 7 Mar 2024 14:44:44 +0000 Subject: [PATCH 2/2] Change the nmne clear to happen at apply_timestep instead of within describe_state --- src/primaite/simulator/network/hardware/base.py | 13 ++++++++++++- .../integration_tests/network/test_capture_nmne.py | 6 ++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 36716f27..82fae164 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -137,7 +137,6 @@ class NetworkInterface(SimComponent, ABC): ) if CAPTURE_NMNE: state.update({"nmne": {k: v for k, v in self.nmne.items()}}) - self.nmne.clear() return state @abstractmethod @@ -254,6 +253,15 @@ class NetworkInterface(SimComponent, ABC): """ return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}" + def apply_timestep(self, timestep: int) -> None: + """ + Apply a timestep evolution to this component. + + This just clears the nmne count back to 0.tests/integration_tests/network/test_capture_nmne.py + """ + super().apply_timestep(timestep=timestep) + self.nmne.clear() + class WiredNetworkInterface(NetworkInterface, ABC): """ @@ -884,6 +892,9 @@ class Node(SimComponent): """ super().apply_timestep(timestep=timestep) + for network_interface in self.network_interfaces.values(): + network_interface.apply_timestep(timestep=timestep) + # count down to boot up if self.start_up_countdown > 0: self.start_up_countdown -= 1 diff --git a/tests/integration_tests/network/test_capture_nmne.py b/tests/integration_tests/network/test_capture_nmne.py index d48b3784..698bfc72 100644 --- a/tests/integration_tests/network/test_capture_nmne.py +++ b/tests/integration_tests/network/test_capture_nmne.py @@ -94,6 +94,7 @@ def test_describe_state_nmne(uc2_network): # Assert that initially, there are no captured MNEs on both web and database servers web_server_nic_state = web_server_nic.describe_state() db_server_nic_state = db_server_nic.describe_state() + uc2_network.apply_timestep(timestep=0) assert web_server_nic_state["nmne"] == {} assert db_server_nic_state["nmne"] == {} @@ -103,6 +104,7 @@ def test_describe_state_nmne(uc2_network): # Check that it does not trigger an MNE capture. web_server_nic_state = web_server_nic.describe_state() db_server_nic_state = db_server_nic.describe_state() + uc2_network.apply_timestep(timestep=0) assert web_server_nic_state["nmne"] == {} assert db_server_nic_state["nmne"] == {} @@ -112,6 +114,7 @@ def test_describe_state_nmne(uc2_network): # Check that the web server's outbound interface and the database server's inbound interface register the MNE web_server_nic_state = web_server_nic.describe_state() db_server_nic_state = db_server_nic.describe_state() + uc2_network.apply_timestep(timestep=0) assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}} assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}} @@ -121,6 +124,7 @@ def test_describe_state_nmne(uc2_network): # Check that no additional MNEs are captured web_server_nic_state = web_server_nic.describe_state() db_server_nic_state = db_server_nic.describe_state() + uc2_network.apply_timestep(timestep=0) assert web_server_nic_state["nmne"] == {} assert db_server_nic_state["nmne"] == {} @@ -130,6 +134,7 @@ def test_describe_state_nmne(uc2_network): # Check that the web server and database server interfaces register an additional MNE web_server_nic_state = web_server_nic.describe_state() db_server_nic_state = db_server_nic.describe_state() + uc2_network.apply_timestep(timestep=0) assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}} assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}} @@ -190,3 +195,4 @@ def test_capture_nmne_observations(uc2_network): # Assert that the observed NMNEs match the expected values for both NICs assert web_nic_obs["outbound"] == expected_nmne assert db_nic_obs["inbound"] == expected_nmne + uc2_network.apply_timestep(timestep=0)