#2459 back-sync b8 changes into core
This commit is contained in:
@@ -22,10 +22,13 @@ def test_capture_nmne(uc2_network):
|
||||
web_server_nic = web_server.network_interface[1]
|
||||
db_server_nic = db_server.network_interface[1]
|
||||
|
||||
# Set the NMNE configuration to capture DELETE queries as MNEs
|
||||
# Set the NMNE configuration to capture DELETE/ENCRYPT queries as MNEs
|
||||
nmne_config = {
|
||||
"capture_nmne": True, # Enable the capture of MNEs
|
||||
"nmne_capture_keywords": ["DELETE"], # Specify "DELETE" SQL command as a keyword for MNE detection
|
||||
"nmne_capture_keywords": [
|
||||
"DELETE",
|
||||
"ENCRYPT",
|
||||
], # Specify "DELETE/ENCRYPT" SQL command as a keyword for MNE detection
|
||||
}
|
||||
|
||||
# Apply the NMNE configuration settings
|
||||
@@ -63,6 +66,20 @@ def test_capture_nmne(uc2_network):
|
||||
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 2}}}}
|
||||
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 2}}}}
|
||||
|
||||
# Perform an "ENCRYPT" query
|
||||
db_client.query("ENCRYPT")
|
||||
|
||||
# Check that the web server and database server interfaces register an additional MNE
|
||||
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 3}}}}
|
||||
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 3}}}}
|
||||
|
||||
# Perform another "SELECT" query
|
||||
db_client.query("SELECT")
|
||||
|
||||
# Check that no additional MNEs are captured
|
||||
assert web_server_nic.nmne == {"direction": {"outbound": {"keywords": {"*": 3}}}}
|
||||
assert db_server_nic.nmne == {"direction": {"inbound": {"keywords": {"*": 3}}}}
|
||||
|
||||
|
||||
def test_describe_state_nmne(uc2_network):
|
||||
"""
|
||||
@@ -70,7 +87,7 @@ def test_describe_state_nmne(uc2_network):
|
||||
|
||||
This test involves a web server querying a database server and checks if the MNEs are captured
|
||||
based on predefined keywords in the network configuration. Specifically, it checks the capture
|
||||
of the "DELETE" SQL command as a malicious network event. It also checks that running describe_state
|
||||
of the "DELETE" / "ENCRYPT" SQL commands as a malicious network event. It also checks that running describe_state
|
||||
only shows MNEs since the last time describe_state was called.
|
||||
"""
|
||||
web_server: Server = uc2_network.get_node_by_hostname("web_server") # noqa
|
||||
@@ -82,10 +99,13 @@ def test_describe_state_nmne(uc2_network):
|
||||
web_server_nic = web_server.network_interface[1]
|
||||
db_server_nic = db_server.network_interface[1]
|
||||
|
||||
# Set the NMNE configuration to capture DELETE queries as MNEs
|
||||
# Set the NMNE configuration to capture DELETE/ENCRYPT queries as MNEs
|
||||
nmne_config = {
|
||||
"capture_nmne": True, # Enable the capture of MNEs
|
||||
"nmne_capture_keywords": ["DELETE"], # Specify "DELETE" SQL command as a keyword for MNE detection
|
||||
"nmne_capture_keywords": [
|
||||
"DELETE",
|
||||
"ENCRYPT",
|
||||
], # "DELETE" & "ENCRYPT" SQL commands as a keywords for MNE detection
|
||||
}
|
||||
|
||||
# Apply the NMNE configuration settings
|
||||
@@ -138,6 +158,36 @@ def test_describe_state_nmne(uc2_network):
|
||||
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 2}}}}
|
||||
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 2}}}}
|
||||
|
||||
# Perform a "ENCRYPT" query
|
||||
db_client.query("ENCRYPT")
|
||||
|
||||
# Check that the web server's outbound interface and the database server's inbound interface register the MNE
|
||||
web_server_nic_state = web_server_nic.describe_state()
|
||||
db_server_nic_state = db_server_nic.describe_state()
|
||||
uc2_network.apply_timestep(timestep=0)
|
||||
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 3}}}}
|
||||
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 3}}}}
|
||||
|
||||
# Perform another "SELECT" query
|
||||
db_client.query("SELECT")
|
||||
|
||||
# Check that no additional MNEs are captured
|
||||
web_server_nic_state = web_server_nic.describe_state()
|
||||
db_server_nic_state = db_server_nic.describe_state()
|
||||
uc2_network.apply_timestep(timestep=0)
|
||||
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 3}}}}
|
||||
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 3}}}}
|
||||
|
||||
# Perform another "ENCRYPT"
|
||||
db_client.query("ENCRYPT")
|
||||
|
||||
# Check that the web server and database server interfaces register an additional MNE
|
||||
web_server_nic_state = web_server_nic.describe_state()
|
||||
db_server_nic_state = db_server_nic.describe_state()
|
||||
uc2_network.apply_timestep(timestep=0)
|
||||
assert web_server_nic_state["nmne"] == {"direction": {"outbound": {"keywords": {"*": 4}}}}
|
||||
assert db_server_nic_state["nmne"] == {"direction": {"inbound": {"keywords": {"*": 4}}}}
|
||||
|
||||
|
||||
def test_capture_nmne_observations(uc2_network):
|
||||
"""
|
||||
@@ -146,7 +196,7 @@ def test_capture_nmne_observations(uc2_network):
|
||||
This test ensures the observation space, as defined by instances of NICObservation, accurately reflects the
|
||||
number of MNEs detected based on network activities over multiple iterations.
|
||||
|
||||
The test employs a series of "DELETE" SQL operations, considered as MNEs, to validate the dynamic update
|
||||
The test employs a series of "DELETE" and "ENCRYPT" SQL operations, considered as MNEs, to validate the dynamic update
|
||||
and accuracy of the observation space related to network interface conditions. It confirms that the
|
||||
observed NIC states match expected MNE activity levels.
|
||||
"""
|
||||
@@ -158,10 +208,13 @@ def test_capture_nmne_observations(uc2_network):
|
||||
db_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"]
|
||||
db_client.connect()
|
||||
|
||||
# Set the NMNE configuration to capture DELETE queries as MNEs
|
||||
# Set the NMNE configuration to capture DELETE/ENCRYPT queries as MNEs
|
||||
nmne_config = {
|
||||
"capture_nmne": True, # Enable the capture of MNEs
|
||||
"nmne_capture_keywords": ["DELETE"], # Specify "DELETE" SQL command as a keyword for MNE detection
|
||||
"nmne_capture_keywords": [
|
||||
"DELETE",
|
||||
"ENCRYPT",
|
||||
], # Specify "DELETE" & "ENCRYPT" SQL commands as a keywords for MNE detection
|
||||
}
|
||||
|
||||
# Apply the NMNE configuration settings
|
||||
@@ -196,3 +249,28 @@ def test_capture_nmne_observations(uc2_network):
|
||||
assert web_nic_obs["outbound"] == expected_nmne
|
||||
assert db_nic_obs["inbound"] == expected_nmne
|
||||
uc2_network.apply_timestep(timestep=0)
|
||||
|
||||
for i in range(0, 20):
|
||||
# Perform a "ENCRYPT" query each iteration
|
||||
for j in range(i):
|
||||
db_client.query("ENCRYPT")
|
||||
|
||||
# Observe the current state of NMNEs from the NICs of both the database and web servers
|
||||
state = sim.describe_state()
|
||||
db_nic_obs = db_server_nic_obs.observe(state)["NMNE"]
|
||||
web_nic_obs = web_server_nic_obs.observe(state)["NMNE"]
|
||||
|
||||
# Define expected NMNE values based on the iteration count
|
||||
if i > 10:
|
||||
expected_nmne = 3 # High level of detected MNEs after 10 iterations
|
||||
elif i > 5:
|
||||
expected_nmne = 2 # Moderate level after more than 5 iterations
|
||||
elif i > 0:
|
||||
expected_nmne = 1 # Low level detected after just starting
|
||||
else:
|
||||
expected_nmne = 0 # No MNEs detected
|
||||
|
||||
# Assert that the observed NMNEs match the expected values for both NICs
|
||||
assert web_nic_obs["outbound"] == expected_nmne
|
||||
assert db_nic_obs["inbound"] == expected_nmne
|
||||
uc2_network.apply_timestep(timestep=0)
|
||||
|
||||
@@ -152,6 +152,22 @@ def test_with_routes_can_ping(multi_hop_network):
|
||||
assert pc_a.ping(pc_b.network_interface[1].ip_address)
|
||||
|
||||
|
||||
def test_with_default_routes_can_ping(multi_hop_network):
|
||||
pc_a = multi_hop_network.get_node_by_hostname("pc_a")
|
||||
pc_b = multi_hop_network.get_node_by_hostname("pc_b")
|
||||
|
||||
router_1: Router = multi_hop_network.get_node_by_hostname("router_1") # noqa
|
||||
router_2: Router = multi_hop_network.get_node_by_hostname("router_2") # noqa
|
||||
|
||||
# Configure Route from Router 1 to PC B subnet
|
||||
router_1.route_table.set_default_route_next_hop_ip_address("192.168.1.2")
|
||||
|
||||
# Configure Route from Router 2 to PC A subnet
|
||||
router_2.route_table.set_default_route_next_hop_ip_address("192.168.1.1")
|
||||
|
||||
assert pc_a.ping(pc_b.network_interface[1].ip_address)
|
||||
|
||||
|
||||
def test_ping_router_port_multi_hop(multi_hop_network):
|
||||
pc_a = multi_hop_network.get_node_by_hostname("pc_a")
|
||||
router_2 = multi_hop_network.get_node_by_hostname("router_2")
|
||||
|
||||
Reference in New Issue
Block a user