Merged PR 298: Update NMNE to only count MNEs in the last step.

## Summary
The NMNE counts MNEs since last step rather than since last episode.

## Test process
Updated unit tests to check for the new behaviour and added new test.

## Checklist
- [x] PR is linked to a **work item**
- [x] **acceptance criteria** of linked ticket are met
- [x] performed **self-review** of the code
- [x] written **tests** for any new functionality added with this PR
- [x] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [x] updated the **change log**
- [x] ran **pre-commit** checks for code style
- [x] attended to any **TO-DOs** left in the code

There's no update necessary to the docs or change log because it NMNE is already documented, and the text is still correct given this minor change.

Related work items: #2348
This commit is contained in:
Marek Wolan
2024-03-07 16:20:39 +00:00
2 changed files with 105 additions and 15 deletions

View File

@@ -136,7 +136,7 @@ class NetworkInterface(SimComponent, ABC):
}
)
if CAPTURE_NMNE:
state.update({"nmne": self.nmne})
state.update({"nmne": {k: v for k, v in self.nmne.items()}})
return state
@abstractmethod
@@ -253,6 +253,15 @@ class NetworkInterface(SimComponent, ABC):
"""
return f"Port {self.port_name if self.port_name else self.port_num}: {self.mac_address}"
def apply_timestep(self, timestep: int) -> None:
"""
Apply a timestep evolution to this component.
This just clears the nmne count back to 0.tests/integration_tests/network/test_capture_nmne.py
"""
super().apply_timestep(timestep=timestep)
self.nmne.clear()
class WiredNetworkInterface(NetworkInterface, ABC):
"""
@@ -883,6 +892,9 @@ class Node(SimComponent):
"""
super().apply_timestep(timestep=timestep)
for network_interface in self.network_interfaces.values():
network_interface.apply_timestep(timestep=timestep)
# count down to boot up
if self.start_up_countdown > 0:
self.start_up_countdown -= 1

View File

@@ -32,36 +32,111 @@ def test_capture_nmne(uc2_network):
set_nmne_config(nmne_config)
# Assert that initially, there are no captured MNEs on both web and database servers
assert web_server_nic.describe_state()["nmne"] == {}
assert db_server_nic.describe_state()["nmne"] == {}
assert web_server_nic.nmne == {}
assert db_server_nic.nmne == {}
# Perform a "SELECT" query
db_client.query("SELECT")
# Check that it does not trigger an MNE capture.
assert web_server_nic.describe_state()["nmne"] == {}
assert db_server_nic.describe_state()["nmne"] == {}
assert web_server_nic.nmne == {}
assert db_server_nic.nmne == {}
# Perform a "DELETE" query
db_client.query("DELETE")
# Check that the web server's outbound interface and the database server's inbound interface register the MNE
assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 1}}}}
# Perform another "SELECT" query
db_client.query("SELECT")
# Check that no additional MNEs are captured
assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 1}}}}
# Perform another "DELETE" query
db_client.query("DELETE")
# Check that the web server and database server interfaces register an additional MNE
assert web_server_nic.describe_state()["nmne"] == {"direction": {"outbound": {"keywords": {"*": 2}}}}
assert db_server_nic.describe_state()["nmne"] == {"direction": {"inbound": {"keywords": {"*": 2}}}}
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 2}}}}
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 2}}}}
def test_describe_state_nmne(uc2_network):
"""
Conducts a test to verify that Malicious Network Events (MNEs) are correctly represented in the nic state.
This test involves a web server querying a database server and checks if the MNEs are captured
based on predefined keywords in the network configuration. Specifically, it checks the capture
of the "DELETE" SQL command as a malicious network event. It also checks that running describe_state
only shows MNEs since the last time describe_state was called.
"""
web_server: Server = uc2_network.get_node_by_hostname("web_server") # noqa
db_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"] # noqa
db_client.connect()
db_server: Server = uc2_network.get_node_by_hostname("database_server") # noqa
web_server_nic = web_server.network_interface[1]
db_server_nic = db_server.network_interface[1]
# Set the NMNE configuration to capture DELETE queries as MNEs
nmne_config = {
"capture_nmne": True, # Enable the capture of MNEs
"nmne_capture_keywords": ["DELETE"], # Specify "DELETE" SQL command as a keyword for MNE detection
}
# Apply the NMNE configuration settings
set_nmne_config(nmne_config)
# Assert that initially, there are no captured MNEs on both web and database servers
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
uc2_network.apply_timestep(timestep=0)
assert web_server_nic_state["nmne"] == {}
assert db_server_nic_state["nmne"] == {}
# Perform a "SELECT" query
db_client.query("SELECT")
# Check that it does not trigger an MNE capture.
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
uc2_network.apply_timestep(timestep=0)
assert web_server_nic_state["nmne"] == {}
assert db_server_nic_state["nmne"] == {}
# Perform a "DELETE" query
db_client.query("DELETE")
# Check that the web server's outbound interface and the database server's inbound interface register the MNE
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
uc2_network.apply_timestep(timestep=0)
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
# Perform another "SELECT" query
db_client.query("SELECT")
# Check that no additional MNEs are captured
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
uc2_network.apply_timestep(timestep=0)
assert web_server_nic_state["nmne"] == {}
assert db_server_nic_state["nmne"] == {}
# Perform another "DELETE" query
db_client.query("DELETE")
# Check that the web server and database server interfaces register an additional MNE
web_server_nic_state = web_server_nic.describe_state()
db_server_nic_state = db_server_nic.describe_state()
uc2_network.apply_timestep(timestep=0)
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 1}}}}
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 1}}}}
def test_capture_nmne_observations(uc2_network):
@@ -97,13 +172,15 @@ def test_capture_nmne_observations(uc2_network):
web_server_nic_obs = NicObservation(where=["network", "nodes", "web_server", "NICs", 1])
# Iterate through a set of test cases to simulate multiple DELETE queries
for i in range(1, 20):
for i in range(0, 20):
# Perform a "DELETE" query each iteration
db_client.query("DELETE")
for j in range(i):
db_client.query("DELETE")
# Observe the current state of NMNEs from the NICs of both the database and web servers
db_nic_obs = db_server_nic_obs.observe(sim.describe_state())["nmne"]
web_nic_obs = web_server_nic_obs.observe(sim.describe_state())["nmne"]
state = sim.describe_state()
db_nic_obs = db_server_nic_obs.observe(state)["nmne"]
web_nic_obs = web_server_nic_obs.observe(state)["nmne"]
# Define expected NMNE values based on the iteration count
if i > 10:
@@ -118,3 +195,4 @@ def test_capture_nmne_observations(uc2_network):
# Assert that the observed NMNEs match the expected values for both NICs
assert web_nic_obs["outbound"] == expected_nmne
assert db_nic_obs["inbound"] == expected_nmne
uc2_network.apply_timestep(timestep=0)