359 lines
14 KiB
Python
359 lines
14 KiB
Python
from ipaddress import IPv4Address
|
|
from typing import Tuple
|
|
|
|
import pytest
|
|
|
|
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
|
|
from primaite.simulator.network.container import Network
|
|
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
|
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
|
from primaite.simulator.network.hardware.nodes.host.server import Server
|
|
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
|
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
|
|
from primaite.simulator.system.services.database.database_service import DatabaseService
|
|
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
|
|
from primaite.simulator.system.services.service import ServiceOperatingState
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def peer_to_peer() -> Tuple[Computer, Computer]:
|
|
network = Network()
|
|
node_a = Computer(hostname="node_a", ip_address="192.168.0.10", subnet_mask="255.255.255.0", start_up_duration=0)
|
|
node_a.power_on()
|
|
node_a.software_manager.get_open_ports()
|
|
|
|
node_b = Computer(hostname="node_b", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0)
|
|
node_b.power_on()
|
|
network.connect(node_a.network_interface[1], node_b.network_interface[1])
|
|
|
|
assert node_a.ping("192.168.0.11")
|
|
|
|
node_a.software_manager.install(DatabaseClient)
|
|
node_a.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.11"))
|
|
node_a.software_manager.software["DatabaseClient"].run()
|
|
|
|
node_b.software_manager.install(DatabaseService)
|
|
database_service: DatabaseService = node_b.software_manager.software["DatabaseService"] # noqa
|
|
database_service.start()
|
|
return node_a, node_b
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def peer_to_peer_secure_db(peer_to_peer) -> Tuple[Computer, Computer]:
|
|
node_a, node_b = peer_to_peer
|
|
|
|
database_service: DatabaseService = node_b.software_manager.software["DatabaseService"] # noqa
|
|
database_service.stop()
|
|
database_service.password = "12345"
|
|
database_service.start()
|
|
return node_a, node_b
|
|
|
|
|
|
def test_database_client_server_connection(peer_to_peer):
|
|
node_a, node_b = peer_to_peer
|
|
|
|
db_client: DatabaseClient = node_a.software_manager.software["DatabaseClient"]
|
|
|
|
db_service: DatabaseService = node_b.software_manager.software["DatabaseService"]
|
|
|
|
db_client.connect()
|
|
|
|
assert len(db_client.client_connections) == 1
|
|
assert len(db_service.connections) == 1
|
|
|
|
db_client.disconnect()
|
|
assert len(db_client.client_connections) == 0
|
|
assert len(db_service.connections) == 0
|
|
|
|
|
|
def test_database_client_server_correct_password(peer_to_peer_secure_db):
|
|
node_a, node_b = peer_to_peer_secure_db
|
|
|
|
db_client: DatabaseClient = node_a.software_manager.software["DatabaseClient"]
|
|
|
|
db_service: DatabaseService = node_b.software_manager.software["DatabaseService"]
|
|
|
|
db_client.configure(server_ip_address=IPv4Address("192.168.0.11"), server_password="12345")
|
|
db_client.connect()
|
|
assert len(db_client.client_connections) == 1
|
|
assert len(db_service.connections) == 1
|
|
|
|
|
|
def test_database_client_server_incorrect_password(peer_to_peer_secure_db):
|
|
node_a, node_b = peer_to_peer_secure_db
|
|
|
|
db_client: DatabaseClient = node_a.software_manager.software["DatabaseClient"]
|
|
|
|
db_service: DatabaseService = node_b.software_manager.software["DatabaseService"]
|
|
|
|
# should fail
|
|
db_client.connect()
|
|
assert len(db_client.connections) == 0
|
|
assert len(db_service.connections) == 0
|
|
|
|
db_client.configure(server_ip_address=IPv4Address("192.168.0.11"), server_password="wrongpass")
|
|
db_client.connect()
|
|
assert len(db_client.connections) == 0
|
|
assert len(db_service.connections) == 0
|
|
|
|
|
|
def test_database_client_native_connection_query(uc2_network):
|
|
"""Tests DB query across the network returns HTTP status 200 and date."""
|
|
web_server: Server = uc2_network.get_node_by_hostname("web_server")
|
|
db_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"]
|
|
db_client.connect()
|
|
assert db_client.query(sql="SELECT")
|
|
assert db_client.query(sql="INSERT")
|
|
|
|
|
|
def test_database_client_connection_query(uc2_network):
|
|
"""Tests DB query across the network returns HTTP status 200 and date."""
|
|
web_server: Server = uc2_network.get_node_by_hostname("web_server")
|
|
db_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"]
|
|
|
|
db_connection: DatabaseClientConnection = db_client.get_new_connection()
|
|
|
|
assert db_connection.query(sql="SELECT")
|
|
assert db_connection.query(sql="INSERT")
|
|
|
|
|
|
def test_create_database_backup(uc2_network):
|
|
"""Run the backup_database method and check if the FTP server has the relevant file."""
|
|
db_server: Server = uc2_network.get_node_by_hostname("database_server")
|
|
db_service: DatabaseService = db_server.software_manager.software["DatabaseService"]
|
|
|
|
# back up should be created
|
|
assert db_service.backup_database() is True
|
|
|
|
backup_server: Server = uc2_network.get_node_by_hostname("backup_server")
|
|
ftp_server: FTPServer = backup_server.software_manager.software["FTPServer"]
|
|
|
|
# backup file should exist in the backup server
|
|
assert ftp_server.file_system.get_file(folder_name=db_service.uuid, file_name="database.db") is not None
|
|
|
|
|
|
def test_restore_backup(uc2_network):
|
|
"""Run the restore_backup method and check if the backup is properly restored."""
|
|
db_server: Server = uc2_network.get_node_by_hostname("database_server")
|
|
db_service: DatabaseService = db_server.software_manager.software["DatabaseService"]
|
|
|
|
# create a back up
|
|
assert db_service.backup_database() is True
|
|
|
|
# delete database locally
|
|
db_service.file_system.delete_file(folder_name="database", file_name="database.db")
|
|
|
|
assert db_service.file_system.get_file(folder_name="database", file_name="database.db") is None
|
|
|
|
# back up should be restored
|
|
assert db_service.restore_backup() is True
|
|
|
|
assert db_service.file_system.get_file(folder_name="database", file_name="database.db") is not None
|
|
|
|
|
|
def test_restore_backup_without_updating_scan(uc2_network):
|
|
"""Same test as restore backup but the file is previously seen as corrupted."""
|
|
db_server: Server = uc2_network.get_node_by_hostname("database_server")
|
|
db_service: DatabaseService = db_server.software_manager.software["DatabaseService"]
|
|
|
|
# create a back up
|
|
assert db_service.backup_database() is True
|
|
|
|
db_service.db_file.corrupt() # corrupt the db
|
|
assert db_service.db_file.health_status == FileSystemItemHealthStatus.CORRUPT # db file is actually corrupt
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.GOOD # not scanned yet
|
|
|
|
db_service.db_file.scan() # scan the db file
|
|
|
|
# db file is corrupt since last scan
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
|
|
|
# back up should be restored
|
|
assert db_service.restore_backup() is True
|
|
|
|
assert db_service.db_file.health_status == FileSystemItemHealthStatus.GOOD # db file is actually good
|
|
# db file is corrupt since last scan
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
|
|
|
db_service.db_file.scan() # scan the db file
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.GOOD # now looks good
|
|
|
|
|
|
def test_restore_backup_after_deleting_file_without_updating_scan(uc2_network):
|
|
"""Same test as restore backup but the file is previously seen as corrupted."""
|
|
db_server: Server = uc2_network.get_node_by_hostname("database_server")
|
|
db_service: DatabaseService = db_server.software_manager.software["DatabaseService"]
|
|
|
|
assert db_service.backup_database() is True
|
|
|
|
db_service.db_file.corrupt() # corrupt the db
|
|
assert db_service.db_file.health_status == FileSystemItemHealthStatus.CORRUPT # db file is actually corrupt
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.GOOD # not scanned yet
|
|
|
|
db_service.db_file.scan() # scan the db file
|
|
|
|
# db file is corrupt since last scan
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
|
|
|
# delete database locally
|
|
db_service.file_system.delete_file(folder_name="database", file_name="database.db")
|
|
|
|
# db file is gone, reduced to atoms
|
|
assert db_service.db_file is None
|
|
|
|
# back up should be restored
|
|
assert db_service.restore_backup() is True
|
|
|
|
assert db_service.db_file.health_status == FileSystemItemHealthStatus.GOOD # db file is actually good
|
|
# db file is corrupt since last scan
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
|
|
|
db_service.db_file.scan() # scan the db file
|
|
assert db_service.db_file.visible_health_status == FileSystemItemHealthStatus.GOOD # now looks good
|
|
|
|
|
|
def test_database_client_cannot_query_offline_database_server(uc2_network):
|
|
"""Tests DB query across the network returns HTTP status 404 when db server is offline."""
|
|
db_server: Server = uc2_network.get_node_by_hostname("database_server")
|
|
db_service: DatabaseService = db_server.software_manager.software.get("DatabaseService")
|
|
|
|
assert db_server.operating_state is NodeOperatingState.ON
|
|
assert db_service.operating_state is ServiceOperatingState.RUNNING
|
|
|
|
web_server: Server = uc2_network.get_node_by_hostname("web_server")
|
|
db_client: DatabaseClient = web_server.software_manager.software.get("DatabaseClient")
|
|
db_client.connect()
|
|
assert len(db_client.client_connections)
|
|
|
|
# Establish a new connection to the DatabaseService
|
|
db_connection: DatabaseClientConnection = db_client.get_new_connection()
|
|
|
|
assert db_connection.query("SELECT") is True
|
|
assert db_connection.query("INSERT") is True
|
|
db_server.power_off()
|
|
|
|
for i in range(db_server.shut_down_duration + 1):
|
|
uc2_network.apply_timestep(timestep=i)
|
|
|
|
assert db_server.operating_state is NodeOperatingState.OFF
|
|
assert db_service.operating_state is ServiceOperatingState.STOPPED
|
|
|
|
assert db_connection.query("SELECT") is False
|
|
assert db_connection.query("INSERT") is False
|
|
|
|
|
|
def test_database_client_uninstall_terminates_connections(peer_to_peer):
|
|
node_a, node_b = peer_to_peer
|
|
|
|
db_client: DatabaseClient = node_a.software_manager.software["DatabaseClient"]
|
|
db_service: DatabaseService = node_b.software_manager.software["DatabaseService"] # noqa
|
|
|
|
db_connection: DatabaseClientConnection = db_client.get_new_connection()
|
|
|
|
# Check that all connection counters are correct and that the client connection can query the database
|
|
assert len(db_service.connections) == 1
|
|
|
|
assert len(db_client.client_connections) == 1
|
|
|
|
assert db_connection.is_active
|
|
|
|
assert db_connection.query("SELECT")
|
|
|
|
# Perform the DatabaseClient uninstall
|
|
node_a.software_manager.uninstall("DatabaseClient")
|
|
|
|
# Check that all connection counters are updated accordingly and client connection can no longer query the database
|
|
assert len(db_service.connections) == 0
|
|
|
|
assert len(db_client.client_connections) == 0
|
|
|
|
assert not db_connection.query("SELECT")
|
|
|
|
assert not db_connection.is_active
|
|
|
|
|
|
def test_database_service_can_terminate_connection(peer_to_peer):
|
|
node_a, node_b = peer_to_peer
|
|
|
|
db_client: DatabaseClient = node_a.software_manager.software["DatabaseClient"]
|
|
db_service: DatabaseService = node_b.software_manager.software["DatabaseService"] # noqa
|
|
|
|
db_connection: DatabaseClientConnection = db_client.get_new_connection()
|
|
|
|
# Check that all connection counters are correct and that the client connection can query the database
|
|
assert len(db_service.connections) == 1
|
|
|
|
assert len(db_client.client_connections) == 1
|
|
|
|
assert db_connection.is_active
|
|
|
|
assert db_connection.query("SELECT")
|
|
|
|
# Perform the server-led connection termination
|
|
connection_id = next(iter(db_service.connections.keys()))
|
|
db_service.terminate_connection(connection_id)
|
|
|
|
# Check that all connection counters are updated accordingly and client connection can no longer query the database
|
|
assert len(db_service.connections) == 0
|
|
|
|
assert len(db_client.client_connections) == 0
|
|
|
|
assert not db_connection.query("SELECT")
|
|
|
|
assert not db_connection.is_active
|
|
|
|
|
|
def test_client_connection_terminate_does_not_terminate_another_clients_connection():
|
|
network = Network()
|
|
|
|
db_server = Server(
|
|
hostname="db_client", ip_address="192.168.0.11", subnet_mask="255.255.255.0", start_up_duration=0
|
|
)
|
|
db_server.power_on()
|
|
|
|
db_server.software_manager.install(DatabaseService)
|
|
db_service: DatabaseService = db_server.software_manager.software["DatabaseService"] # noqa
|
|
db_service.start()
|
|
|
|
client_a = Computer(
|
|
hostname="client_a", ip_address="192.168.0.12", subnet_mask="255.255.255.0", start_up_duration=0
|
|
)
|
|
client_a.power_on()
|
|
|
|
client_a.software_manager.install(DatabaseClient)
|
|
client_a.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.11"))
|
|
client_a.software_manager.software["DatabaseClient"].run()
|
|
|
|
client_b = Computer(
|
|
hostname="client_b", ip_address="192.168.0.13", subnet_mask="255.255.255.0", start_up_duration=0
|
|
)
|
|
client_b.power_on()
|
|
|
|
client_b.software_manager.install(DatabaseClient)
|
|
client_b.software_manager.software["DatabaseClient"].configure(server_ip_address=IPv4Address("192.168.0.11"))
|
|
client_b.software_manager.software["DatabaseClient"].run()
|
|
|
|
switch = Switch(hostname="switch", start_up_duration=0, num_ports=3)
|
|
switch.power_on()
|
|
|
|
network.connect(endpoint_a=switch.network_interface[1], endpoint_b=db_server.network_interface[1])
|
|
network.connect(endpoint_a=switch.network_interface[2], endpoint_b=client_a.network_interface[1])
|
|
network.connect(endpoint_a=switch.network_interface[3], endpoint_b=client_b.network_interface[1])
|
|
|
|
db_client_a: DatabaseClient = client_a.software_manager.software["DatabaseClient"] # noqa
|
|
db_connection_a = db_client_a.get_new_connection()
|
|
|
|
assert db_connection_a.query("SELECT")
|
|
assert len(db_service.connections) == 1
|
|
|
|
db_client_b: DatabaseClient = client_b.software_manager.software["DatabaseClient"] # noqa
|
|
db_connection_b = db_client_b.get_new_connection()
|
|
|
|
assert db_connection_b.query("SELECT")
|
|
assert len(db_service.connections) == 2
|
|
|
|
db_connection_a.disconnect()
|
|
|
|
assert db_connection_b.query("SELECT")
|
|
assert len(db_service.connections) == 1
|