#2417 update observation tests and make old tests pass

This commit is contained in:
Marek Wolan
2024-04-01 00:54:55 +01:00
parent 0e0df1012f
commit 0ba767d2a0
22 changed files with 767 additions and 626 deletions

View File

@@ -36,9 +36,11 @@ def test_acl_observations(simulation):
acl_obs = ACLObservation(
where=["network", "nodes", router.hostname, "acl", "acl"],
node_ip_to_id={},
ports=["NTP", "HTTP", "POSTGRES_SERVER"],
protocols=["TCP", "UDP", "ICMP"],
ip_list=[],
port_list=["NTP", "HTTP", "POSTGRES_SERVER"],
protocol_list=["TCP", "UDP", "ICMP"],
num_rules=10,
wildcard_list=[],
)
observation_space = acl_obs.observe(simulation.describe_state())
@@ -46,11 +48,11 @@ def test_acl_observations(simulation):
rule_obs = observation_space.get(1) # this is the ACL Rule added to allow NTP
assert rule_obs.get("position") == 0 # rule was put at position 1 (0 because counting from 1 instead of 1)
assert rule_obs.get("permission") == 1 # permit = 1 deny = 2
assert rule_obs.get("source_node_id") == 1 # applies to all source nodes
assert rule_obs.get("dest_node_id") == 1 # applies to all destination nodes
assert rule_obs.get("source_port") == 2 # NTP port is mapped to value 2 (1 = ALL, so 1+1 = 2 quik mafs)
assert rule_obs.get("dest_port") == 2 # NTP port is mapped to value 2
assert rule_obs.get("protocol") == 1 # 1 = No Protocol
assert rule_obs.get("source_ip_id") == 1 # applies to all source nodes
assert rule_obs.get("dest_ip_id") == 1 # applies to all destination nodes
assert rule_obs.get("source_port_id") == 2 # NTP port is mapped to value 2 (1 = ALL, so 1+1 = 2 quik mafs)
assert rule_obs.get("dest_port_id") == 2 # NTP port is mapped to value 2
assert rule_obs.get("protocol_id") == 1 # 1 = No Protocol
router.acl.remove_rule(1)
@@ -59,8 +61,8 @@ def test_acl_observations(simulation):
rule_obs = observation_space.get(1) # this is the ACL Rule added to allow NTP
assert rule_obs.get("position") == 0
assert rule_obs.get("permission") == 0
assert rule_obs.get("source_node_id") == 0
assert rule_obs.get("dest_node_id") == 0
assert rule_obs.get("source_port") == 0
assert rule_obs.get("dest_port") == 0
assert rule_obs.get("protocol") == 0
assert rule_obs.get("source_ip_id") == 0
assert rule_obs.get("dest_ip_id") == 0
assert rule_obs.get("source_port_id") == 0
assert rule_obs.get("dest_port_id") == 0
assert rule_obs.get("protocol_id") == 0

View File

@@ -23,7 +23,8 @@ def test_file_observation(simulation):
file = pc.file_system.create_file(file_name="dog.png")
dog_file_obs = FileObservation(
where=["network", "nodes", pc.hostname, "file_system", "folders", "root", "files", "dog.png"]
where=["network", "nodes", pc.hostname, "file_system", "folders", "root", "files", "dog.png"],
include_num_access=False,
)
assert dog_file_obs.space["health_status"] == spaces.Discrete(6)
@@ -49,7 +50,10 @@ def test_folder_observation(simulation):
file = pc.file_system.create_file(file_name="dog.png", folder_name="test_folder")
root_folder_obs = FolderObservation(
where=["network", "nodes", pc.hostname, "file_system", "folders", "test_folder"]
where=["network", "nodes", pc.hostname, "file_system", "folders", "test_folder"],
include_num_access=False,
num_files=1,
files=[],
)
assert root_folder_obs.space["health_status"] == spaces.Discrete(6)

View File

@@ -40,7 +40,7 @@ def test_nic(simulation):
nic: NIC = pc.network_interface[1]
nic_obs = NICObservation(where=["network", "nodes", pc.hostname, "NICs", 1])
nic_obs = NICObservation(where=["network", "nodes", pc.hostname, "NICs", 1], include_nmne=True)
assert nic_obs.space["nic_status"] == spaces.Discrete(3)
assert nic_obs.space["NMNE"]["inbound"] == spaces.Discrete(4)
@@ -61,17 +61,22 @@ def test_nic_categories(simulation):
"""Test the NIC observation nmne count categories."""
pc: Computer = simulation.network.get_node_by_hostname("client_1")
nic_obs = NICObservation(where=["network", "nodes", pc.hostname, "NICs", 1])
nic_obs = NICObservation(where=["network", "nodes", pc.hostname, "NICs", 1], include_nmne=True)
assert nic_obs.high_nmne_threshold == 10 # default
assert nic_obs.med_nmne_threshold == 5 # default
assert nic_obs.low_nmne_threshold == 0 # default
@pytest.mark.skip(reason="Feature not implemented yet")
def test_config_nic_categories(simulation):
pc: Computer = simulation.network.get_node_by_hostname("client_1")
nic_obs = NICObservation(
where=["network", "nodes", pc.hostname, "NICs", 1],
low_nmne_threshold=3,
med_nmne_threshold=6,
high_nmne_threshold=9,
include_nmne=True,
)
assert nic_obs.high_nmne_threshold == 9
@@ -85,6 +90,7 @@ def test_nic_categories(simulation):
low_nmne_threshold=9,
med_nmne_threshold=6,
high_nmne_threshold=9,
include_nmne=True,
)
with pytest.raises(Exception):
@@ -94,4 +100,5 @@ def test_nic_categories(simulation):
low_nmne_threshold=3,
med_nmne_threshold=9,
high_nmne_threshold=9,
include_nmne=True,
)

View File

@@ -19,15 +19,28 @@ def simulation(example_network) -> Simulation:
return sim
def test_node_observation(simulation):
"""Test a Node observation."""
def test_host_observation(simulation):
"""Test a Host observation."""
pc: Computer = simulation.network.get_node_by_hostname("client_1")
node_obs = HostObservation(where=["network", "nodes", pc.hostname])
host_obs = HostObservation(
where=["network", "nodes", pc.hostname],
num_applications=0,
num_files=1,
num_folders=1,
num_nics=2,
num_services=1,
include_num_access=False,
include_nmne=False,
services=[],
applications=[],
folders=[],
network_interfaces=[],
)
assert node_obs.space["operating_status"] == spaces.Discrete(5)
assert host_obs.space["operating_status"] == spaces.Discrete(5)
observation_state = node_obs.observe(simulation.describe_state())
observation_state = host_obs.observe(simulation.describe_state())
assert observation_state.get("operating_status") == 1 # computer is on
assert observation_state.get("SERVICES") is not None
@@ -36,11 +49,11 @@ def test_node_observation(simulation):
# turn off computer
pc.power_off()
observation_state = node_obs.observe(simulation.describe_state())
observation_state = host_obs.observe(simulation.describe_state())
assert observation_state.get("operating_status") == 4 # shutting down
for i in range(pc.shut_down_duration + 1):
pc.apply_timestep(i)
observation_state = node_obs.observe(simulation.describe_state())
observation_state = host_obs.observe(simulation.describe_state())
assert observation_state.get("operating_status") == 2

View File

@@ -14,7 +14,8 @@ def test_file_observation():
state = sim.describe_state()
dog_file_obs = FileObservation(
where=["network", "nodes", pc.hostname, "file_system", "folders", "root", "files", "dog.png"]
where=["network", "nodes", pc.hostname, "file_system", "folders", "root", "files", "dog.png"],
include_num_access=False,
)
assert dog_file_obs.observe(state) == {"health_status": 1}
assert dog_file_obs.space == spaces.Dict({"health_status": spaces.Discrete(6)})

View File

@@ -168,8 +168,8 @@ def test_capture_nmne_observations(uc2_network):
set_nmne_config(nmne_config)
# Define observations for the NICs of the database and web servers
db_server_nic_obs = NICObservation(where=["network", "nodes", "database_server", "NICs", 1])
web_server_nic_obs = NICObservation(where=["network", "nodes", "web_server", "NICs", 1])
db_server_nic_obs = NICObservation(where=["network", "nodes", "database_server", "NICs", 1], include_nmne=True)
web_server_nic_obs = NICObservation(where=["network", "nodes", "web_server", "NICs", 1], include_nmne=True)
# Iterate through a set of test cases to simulate multiple DELETE queries
for i in range(0, 20):