Merge remote-tracking branch 'origin/dev' into bugfix/2143-node-service-patch-main
This commit is contained in:
@@ -5,7 +5,7 @@ from primaite.simulator.network.networks import arcd_uc2_network
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.services.red_services.data_manipulation_bot import (
|
||||
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
|
||||
DataManipulationAttackStage,
|
||||
DataManipulationBot,
|
||||
)
|
||||
@@ -70,4 +70,4 @@ def test_dm_bot_perform_data_manipulation_success(dm_bot):
|
||||
dm_bot._perform_data_manipulation(p_of_success=1.0)
|
||||
|
||||
assert dm_bot.attack_stage in (DataManipulationAttackStage.SUCCEEDED, DataManipulationAttackStage.FAILED)
|
||||
assert dm_bot.connected
|
||||
assert len(dm_bot.connections)
|
||||
@@ -0,0 +1,90 @@
|
||||
from ipaddress import IPv4Address
|
||||
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.computer import Computer
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.applications.red_applications.dos_bot import DoSAttackStage, DoSBot
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def dos_bot() -> DoSBot:
|
||||
computer = Computer(
|
||||
hostname="compromised_pc",
|
||||
ip_address="192.168.0.1",
|
||||
subnet_mask="255.255.255.0",
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
|
||||
computer.software_manager.install(DoSBot)
|
||||
|
||||
dos_bot: DoSBot = computer.software_manager.software.get("DoSBot")
|
||||
dos_bot.configure(target_ip_address=IPv4Address("192.168.0.1"))
|
||||
dos_bot.set_original_state()
|
||||
return dos_bot
|
||||
|
||||
|
||||
def test_dos_bot_creation(dos_bot):
|
||||
"""Test that the DoS bot is installed on a node."""
|
||||
assert dos_bot is not None
|
||||
|
||||
|
||||
def test_dos_bot_reset(dos_bot):
|
||||
assert dos_bot.target_ip_address == IPv4Address("192.168.0.1")
|
||||
assert dos_bot.target_port is Port.POSTGRES_SERVER
|
||||
assert dos_bot.payload is None
|
||||
assert dos_bot.repeat is False
|
||||
|
||||
dos_bot.configure(
|
||||
target_ip_address=IPv4Address("192.168.1.1"), target_port=Port.HTTP, payload="payload", repeat=True
|
||||
)
|
||||
|
||||
# should reset the relevant items
|
||||
dos_bot.reset_component_for_episode(episode=0)
|
||||
assert dos_bot.target_ip_address == IPv4Address("192.168.0.1")
|
||||
assert dos_bot.target_port is Port.POSTGRES_SERVER
|
||||
assert dos_bot.payload is None
|
||||
assert dos_bot.repeat is False
|
||||
|
||||
dos_bot.configure(
|
||||
target_ip_address=IPv4Address("192.168.1.1"), target_port=Port.HTTP, payload="payload", repeat=True
|
||||
)
|
||||
dos_bot.set_original_state()
|
||||
dos_bot.reset_component_for_episode(episode=1)
|
||||
# should reset to the configured value
|
||||
assert dos_bot.target_ip_address == IPv4Address("192.168.1.1")
|
||||
assert dos_bot.target_port is Port.HTTP
|
||||
assert dos_bot.payload == "payload"
|
||||
assert dos_bot.repeat is True
|
||||
|
||||
|
||||
def test_dos_bot_cannot_run_when_node_offline(dos_bot):
|
||||
dos_bot_node: Computer = dos_bot.parent
|
||||
assert dos_bot_node.operating_state is NodeOperatingState.ON
|
||||
|
||||
dos_bot_node.power_off()
|
||||
|
||||
for i in range(dos_bot_node.shut_down_duration + 1):
|
||||
dos_bot_node.apply_timestep(timestep=i)
|
||||
|
||||
assert dos_bot_node.operating_state is NodeOperatingState.OFF
|
||||
|
||||
dos_bot._application_loop()
|
||||
|
||||
# assert not run
|
||||
assert dos_bot.attack_stage is DoSAttackStage.NOT_STARTED
|
||||
|
||||
|
||||
def test_dos_bot_not_configured(dos_bot):
|
||||
dos_bot.target_ip_address = None
|
||||
|
||||
dos_bot.operating_state = ApplicationOperatingState.RUNNING
|
||||
dos_bot._application_loop()
|
||||
|
||||
|
||||
def test_dos_bot_perform_port_scan(dos_bot):
|
||||
dos_bot._perform_port_scan(p_of_success=1)
|
||||
|
||||
assert dos_bot.attack_stage is DoSAttackStage.PORT_SCAN
|
||||
@@ -1,5 +1,6 @@
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Tuple, Union
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -62,18 +63,26 @@ def test_disconnect_when_client_is_closed(database_client_on_computer):
|
||||
|
||||
|
||||
def test_disconnect(database_client_on_computer):
|
||||
"""Database client should set connected to False and remove the database server ip address."""
|
||||
"""Database client should remove the connection."""
|
||||
database_client, computer = database_client_on_computer
|
||||
|
||||
database_client.connected = True
|
||||
database_client._connections[str(uuid4())] = {"item": True}
|
||||
assert len(database_client.connections) == 1
|
||||
|
||||
assert database_client.operating_state is ApplicationOperatingState.RUNNING
|
||||
assert database_client.server_ip_address is not None
|
||||
|
||||
database_client.disconnect()
|
||||
|
||||
assert database_client.connected is False
|
||||
assert database_client.server_ip_address is None
|
||||
assert len(database_client.connections) == 0
|
||||
|
||||
uuid = str(uuid4())
|
||||
database_client._connections[uuid] = {"item": True}
|
||||
assert len(database_client.connections) == 1
|
||||
|
||||
database_client.disconnect(connection_id=uuid)
|
||||
|
||||
assert len(database_client.connections) == 0
|
||||
|
||||
|
||||
def test_query_when_client_is_closed(database_client_on_computer):
|
||||
@@ -86,19 +95,6 @@ def test_query_when_client_is_closed(database_client_on_computer):
|
||||
assert database_client.query(sql="test") is False
|
||||
|
||||
|
||||
def test_query_failed_reattempt(database_client_on_computer):
|
||||
"""Database client query should return False if the reattempt fails."""
|
||||
database_client, computer = database_client_on_computer
|
||||
|
||||
def return_false():
|
||||
return False
|
||||
|
||||
database_client.connect = return_false
|
||||
|
||||
database_client.connected = False
|
||||
assert database_client.query(sql="test", is_reattempt=True) is False
|
||||
|
||||
|
||||
def test_query_fail_to_connect(database_client_on_computer):
|
||||
"""Database client query should return False if the connect attempt fails."""
|
||||
database_client, computer = database_client_on_computer
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from uuid import uuid4
|
||||
|
||||
from primaite.simulator.system.services.service import ServiceOperatingState
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
@@ -66,3 +68,34 @@ def test_enable_disable(service):
|
||||
|
||||
service.enable()
|
||||
assert service.operating_state == ServiceOperatingState.STOPPED
|
||||
|
||||
|
||||
def test_overwhelm_service(service):
|
||||
service.max_sessions = 2
|
||||
service.start()
|
||||
|
||||
uuid = str(uuid4())
|
||||
assert service.add_connection(connection_id=uuid) # should be true
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
|
||||
assert not service.add_connection(connection_id=uuid) # fails because connection already exists
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
|
||||
assert service.add_connection(connection_id=str(uuid4())) # succeed
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
|
||||
assert not service.add_connection(connection_id=str(uuid4())) # fail because at capacity
|
||||
assert service.health_state_actual is SoftwareHealthState.OVERWHELMED
|
||||
|
||||
|
||||
def test_create_and_remove_connections(service):
|
||||
service.start()
|
||||
uuid = str(uuid4())
|
||||
|
||||
assert service.add_connection(connection_id=uuid) # should be true
|
||||
assert len(service.connections) == 1
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
|
||||
assert service.remove_connection(connection_id=uuid) # should be true
|
||||
assert len(service.connections) == 0
|
||||
assert service.health_state_actual is SoftwareHealthState.GOOD
|
||||
|
||||
Reference in New Issue
Block a user