Update NMNE to only count MNEs in the last step.

This commit is contained in:
Marek Wolan
2024-03-07 12:15:30 +00:00
parent 8589ce449a
commit a900d59f7b
2 changed files with 88 additions and 15 deletions

View File

@@ -136,7 +136,8 @@ class NetworkInterface(SimComponent, ABC):
}
)
if CAPTURE_NMNE:
state.update({"nmne": self.nmne})
state.update({"nmne": {k: v for k, v in self.nmne.items()}})
self.nmne.clear()
return state
@abstractmethod

View File

@@ -32,36 +32,106 @@ def test_capture_nmne(uc2_network):
set_nmne_config(nmne_config)
# Assert that initially, there are no captured MNEs on both web and database servers
assert web_server_nic.describe_state()["nmne"] == {}
assert db_server_nic.describe_state()["nmne"] == {}
assert web_server_nic.nmne == {}
assert db_server_nic.nmne == {}
# Perform a "SELECT" query
db_client.query("SELECT")
# Check that it does not trigger an MNE capture.
assert web_server_nic.describe_state()["nmne"] == {}
assert db_server_nic.describe_state()["nmne"] == {}
assert web_server_nic.nmne == {}
assert db_server_nic.nmne == {}
# Perform a "DELETE" query
db_client.query("DELETE")
# Check that the web server's outbound interface and the database server's inbound interface register the MNE
assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 1}}}}
# Perform another "SELECT" query
db_client.query("SELECT")
# Check that no additional MNEs are captured
assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 1}}}}
# Perform another "DELETE" query
db_client.query("DELETE")
# Check that the web server and database server interfaces register an additional MNE
assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 2}}}}
assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 2}}}}
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 2}}}}
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 2}}}}
def test_describe_state_nmne(uc2_network):
"""
Conducts a test to verify that Malicious Network Events (MNEs) are correctly represented in the nic state.
This test involves a web server querying a database server and checks if the MNEs are captured
based on predefined keywords in the network configuration. Specifically, it checks the capture
of the "DELETE" SQL command as a malicious network event. It also checks that running describe_state
only shows MNEs since the last time describe_state was called.
"""
web_server: Server = uc2_network.get_node_by_hostname("web_server") # noqa
db_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"] # noqa
db_client.connect()
db_server: Server = uc2_network.get_node_by_hostname("database_server") # noqa
web_server_nic = web_server.network_interface[1]
db_server_nic = db_server.network_interface[1]
# Set the NMNE configuration to capture DELETE queries as MNEs
nmne_config = {
"capture_nmne": True, # Enable the capture of MNEs
"nmne_capture_keywords": ["DELETE"], # Specify "DELETE" SQL command as a keyword for MNE detection
}
# Apply the NMNE configuration settings
set_nmne_config(nmne_config)
# Assert that initially, there are no captured MNEs on both web and database servers
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
assert web_server_nic_state["nmne"] == {}
assert db_server_nic_state["nmne"] == {}
# Perform a "SELECT" query
db_client.query("SELECT")
# Check that it does not trigger an MNE capture.
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
assert web_server_nic_state["nmne"] == {}
assert db_server_nic_state["nmne"] == {}
# Perform a "DELETE" query
db_client.query("DELETE")
# Check that the web server's outbound interface and the database server's inbound interface register the MNE
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
# Perform another "SELECT" query
db_client.query("SELECT")
# Check that no additional MNEs are captured
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
assert web_server_nic_state["nmne"] == {}
assert db_server_nic_state["nmne"] == {}
# Perform another "DELETE" query
db_client.query("DELETE")
# Check that the web server and database server interfaces register an additional MNE
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
def test_capture_nmne_observations(uc2_network):
@@ -97,13 +167,15 @@ def test_capture_nmne_observations(uc2_network):
web_server_nic_obs = NicObservation(where=["network", "nodes", "web_server", "NICs", 1])
# Iterate through a set of test cases to simulate multiple DELETE queries
for i in range(1, 20):
for i in range(0, 20):
# Perform a "DELETE" query each iteration
db_client.query("DELETE")
for j in range(i):
db_client.query("DELETE")
# Observe the current state of NMNEs from the NICs of both the database and web servers
db_nic_obs = db_server_nic_obs.observe(sim.describe_state())["nmne"]
web_nic_obs = web_server_nic_obs.observe(sim.describe_state())["nmne"]
state = sim.describe_state()
db_nic_obs = db_server_nic_obs.observe(state)["nmne"]
web_nic_obs = web_server_nic_obs.observe(state)["nmne"]
# Define expected NMNE values based on the iteration count
if i > 10: