Files
PrimAITE/src/primaite/simulator/system/services/database/database_service.py

397 lines
16 KiB
Python
Raw Normal View History

from ipaddress import IPv4Address
from typing import Any, Dict, List, Literal, Optional, Union
from uuid import uuid4
2023-11-29 13:18:38 +00:00
from primaite import getLogger
2023-11-08 10:48:41 +00:00
from primaite.simulator.file_system.file_system import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
2024-01-10 18:04:48 +00:00
from primaite.simulator.file_system.folder import Folder
2023-11-08 10:48:41 +00:00
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
2023-08-25 15:29:53 +01:00
2023-11-29 13:18:38 +00:00
_LOGGER = getLogger(__name__)
2023-08-25 15:29:53 +01:00
class DatabaseService(Service):
"""
A class for simulating a generic SQL Server service.
This class inherits from the `Service` class and provides methods to simulate a SQL database.
"""
password: Optional[str] = None
2024-02-23 16:49:01 +00:00
"""Password that needs to be provided by clients if they want to connect to the DatabaseService."""
backup_server_ip: IPv4Address = None
"""IP address of the backup server."""
latest_backup_directory: str = None
"""Directory of latest backup."""
latest_backup_file_name: str = None
"""File name of latest backup."""
def __init__(self, **kwargs):
kwargs["name"] = "DatabaseService"
kwargs["port"] = Port.POSTGRES_SERVER
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
self._create_db_file()
def install(self):
super().install()
if not self.software_manager.software.get("FTPClient"):
self.sys_log.info(f"{self.name}: Installing FTPClient to enable database backups")
self.software_manager.install(FTPClient)
def configure_backup(self, backup_server: IPv4Address):
"""
Set up the database backup.
:param: backup_server_ip: The IP address of the backup server
"""
self.backup_server_ip = backup_server
def backup_database(self) -> bool:
"""Create a backup of the database to the configured backup server."""
# check if this action can be performed
if not self._can_perform_action():
return False
# check if the backup server was configured
if self.backup_server_ip is None:
self.sys_log.warning(f"{self.name} - {self.sys_log.hostname}: not configured.")
return False
software_manager: SoftwareManager = self.software_manager
ftp_client_service: FTPClient = software_manager.software.get("FTPClient")
if not ftp_client_service:
self.sys_log.error(
f"{self.name}: Failed to perform database backup as the FTPClient software is not installed"
)
return False
# send backup copy of database file to FTP server
2024-01-15 09:48:14 +00:00
if not self.db_file:
self.sys_log.error(
f"{self.name}: Attempted to backup database file but it doesn't exist."
)
2024-01-15 09:48:14 +00:00
return False
response = ftp_client_service.send_file(
dest_ip_address=self.backup_server_ip,
2024-01-10 18:04:48 +00:00
src_file_name=self.db_file.name,
src_folder_name="database",
dest_folder_name=str(self.uuid),
dest_file_name="database.db",
)
if response:
return True
self.sys_log.error("Unable to create database backup.")
return False
def restore_backup(self) -> bool:
"""Restore a backup from backup server."""
# check if this action can be performed
if not self._can_perform_action():
return False
software_manager: SoftwareManager = self.software_manager
ftp_client_service: FTPClient = software_manager.software.get("FTPClient")
if not ftp_client_service:
self.sys_log.error(
f"{self.name}: Failed to restore database backup as the FTPClient software is not installed"
)
return False
# retrieve backup file from backup server
response = ftp_client_service.request_file(
src_folder_name=str(self.uuid),
src_file_name="database.db",
dest_folder_name="downloads",
dest_file_name="database.db",
dest_ip_address=self.backup_server_ip,
)
if not response:
self.sys_log.error("Unable to restore database backup.")
return False
old_visible_state = SoftwareHealthState.GOOD
# get db file regardless of whether or not it was deleted
db_file = self.file_system.get_file(folder_name="database", file_name="database.db", include_deleted=True)
if db_file is None:
self.sys_log.warning("Database file not initialised.")
return False
# if the file was deleted, get the old visible health state
if db_file.deleted:
old_visible_state = db_file.visible_health_status
else:
old_visible_state = self.db_file.visible_health_status
self.file_system.delete_file(folder_name="database", file_name="database.db")
# replace db file
2024-01-10 18:04:48 +00:00
self.file_system.copy_file(src_folder_name="downloads", src_file_name="database.db", dst_folder_name="database")
2024-01-10 18:04:48 +00:00
if self.db_file is None:
self.sys_log.error("Copying database backup failed.")
return False
self.db_file.visible_health_status = old_visible_state
self.set_health_state(SoftwareHealthState.GOOD)
return True
def _create_db_file(self):
"""Creates the Simulation File and sqlite file in the file system."""
2024-01-10 18:04:48 +00:00
self.file_system.create_file(folder_name="database", file_name="database.db")
@property
def db_file(self) -> File:
"""Returns the database file."""
return self.file_system.get_file(folder_name="database", file_name="database.db")
2024-04-15 11:50:08 +01:00
def _return_database_folder(self) -> Folder:
2024-01-10 18:04:48 +00:00
"""Returns the database folder."""
return self.file_system.get_folder_by_id(self.db_file.folder_id)
def _generate_connection_id(self) -> str:
"""Generate a unique connection ID."""
return str(uuid4())
def _process_connect(
self,
src_ip: IPv4Address,
connection_request_id: str,
password: Optional[str] = None,
session_id: Optional[str] = None,
) -> Dict[str, Union[int, Dict[str, bool]]]:
2024-01-30 09:56:16 +00:00
"""Process an incoming connection request.
:param connection_id: A unique identifier for the connection
:type connection_id: str
:param password: Supplied password. It must match self.password for connection success, defaults to None
:type password: Optional[str], optional
:return: Response to connection request containing success info.
:rtype: Dict[str, Union[int, Dict[str, bool]]]
"""
status_code = 500 # Default internal server error
connection_id = None
if self.operating_state == ServiceOperatingState.RUNNING:
status_code = 503 # service unavailable
if self.health_state_actual == SoftwareHealthState.OVERWHELMED:
self.sys_log.error(f"{self.name}: Connect request for {src_ip=} declined. Service is at capacity.")
if self.health_state_actual == SoftwareHealthState.GOOD:
if self.password == password:
status_code = 200 # ok
connection_id = self._generate_connection_id()
# try to create connection
if not self.add_connection(connection_id=connection_id, session_id=session_id):
status_code = 500
self.sys_log.warning(f"{self.name}: Connect request for {connection_id=} declined")
else:
self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised")
else:
status_code = 401 # Unauthorised
self.sys_log.warning(f"{self.name}: Connect request for {connection_id=} declined")
else:
status_code = 404 # service not found
return {
"status_code": status_code,
"type": "connect_response",
"response": status_code == 200,
"connection_id": connection_id,
"connection_request_id": connection_request_id,
}
def _process_sql(
self,
query: Literal["SELECT", "DELETE", "INSERT", "ENCRYPT"],
query_id: str,
connection_id: Optional[str] = None,
) -> Dict[str, Union[int, List[Any]]]:
"""
Executes the given SQL query and returns the result.
Possible queries:
- SELECT : returns the data
- DELETE : deletes the data
2024-02-14 13:18:20 +00:00
- INSERT : inserts the data
2024-04-15 11:50:08 +01:00
- ENCRYPT : corrupts the data
:param query: The SQL query to be executed.
:return: Dictionary containing status code and data fetched.
"""
self.sys_log.info(f"{self.name}: Running {query}")
if not self.db_file:
self.sys_log.error(f"{self.name}: Failed to run {query} because the database file is missing.")
return {"status_code": 404, "type": "sql", "data": False}
if query == "SELECT":
2024-04-15 11:50:08 +01:00
if self.db_file.health_status == FileSystemItemHealthStatus.CORRUPT:
return {
"status_code": 200,
"type": "sql",
"data": False,
"uuid": query_id,
"connection_id": connection_id,
}
elif self.db_file.health_status == FileSystemItemHealthStatus.GOOD:
return {
"status_code": 200,
"type": "sql",
"data": True,
"uuid": query_id,
"connection_id": connection_id,
}
else:
return {"status_code": 404, "type": "sql", "data": False}
elif query == "DELETE":
2024-01-10 18:04:48 +00:00
self.db_file.health_status = FileSystemItemHealthStatus.COMPROMISED
2024-02-14 13:18:20 +00:00
return {
"status_code": 200,
"type": "sql",
"data": False,
"uuid": query_id,
"connection_id": connection_id,
2024-04-15 11:50:08 +01:00
}
elif query == "ENCRYPT":
self.file_system.num_file_creations += 1
self.db_file.health_status = FileSystemItemHealthStatus.CORRUPT
self.db_file.num_access += 1
database_folder = self._return_database_folder()
database_folder.health_status = FileSystemItemHealthStatus.CORRUPT
self.file_system.num_file_deletions += 1
return {
"status_code": 200,
"type": "sql",
"data": False,
"uuid": query_id,
"connection_id": connection_id,
2024-02-14 13:18:20 +00:00
}
elif query == "INSERT":
if self.health_state_actual == SoftwareHealthState.GOOD:
return {
"status_code": 200,
"type": "sql",
"data": False,
"uuid": query_id,
"connection_id": connection_id,
}
else:
return {"status_code": 404, "type": "sql", "data": False}
elif query == "SELECT * FROM pg_stat_activity":
# Check if the connection is active.
if self.health_state_actual == SoftwareHealthState.GOOD:
return {
"status_code": 200,
"type": "sql",
"data": False,
"uuid": query_id,
"connection_id": connection_id,
}
else:
return {"status_code": 401, "data": False}
else:
# Invalid query
self.sys_log.warning(f"{self.name}: Invalid {query}")
return {"status_code": 500, "data": False}
2023-08-25 15:29:53 +01:00
2023-08-29 13:21:34 +01:00
def describe_state(self) -> Dict:
2023-08-31 11:32:11 +01:00
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
2023-08-29 13:21:34 +01:00
return super().describe_state()
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
Processes the incoming SQL payload and sends the result back.
:param payload: The SQL query to be executed.
:param session_id: The session identifier.
:return: True if the Status Code is 200, otherwise False.
"""
result = {"status_code": 500, "data": []}
# if server service is down, return error
if not self._can_perform_action():
return False
if isinstance(payload, dict) and payload.get("type"):
if payload["type"] == "connect_request":
src_ip = kwargs.get("frame").ip.src_ip_address
result = self._process_connect(
src_ip=src_ip,
password=payload.get("password"),
connection_request_id=payload.get("connection_request_id"),
session_id=session_id,
)
elif payload["type"] == "disconnect":
if payload["connection_id"] in self.connections:
connection_id = payload["connection_id"]
connected_ip_address = self.connections[connection_id]["ip_address"]
frame = kwargs.get("frame")
if connected_ip_address == frame.ip.src_ip_address:
self.sys_log.info(
f"{self.name}: Received disconnect command for {connection_id=} from {connected_ip_address}"
)
self.terminate_connection(connection_id=payload["connection_id"], send_disconnect=False)
else:
self.sys_log.warning(
f"{self.name}: Ignoring disconnect command for {connection_id=} as the command source "
f"({frame.ip.src_ip_address}) doesn't match the connection source ({connected_ip_address})"
)
elif payload["type"] == "sql":
if payload.get("connection_id") in self.connections:
result = self._process_sql(
query=payload["sql"], query_id=payload["uuid"], connection_id=payload["connection_id"]
)
else:
result = {"status_code": 401, "type": "sql"}
self.send(payload=result, session_id=session_id)
return True
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
Send a SQL response back down to the SessionManager.
:param payload: The SQL query results.
:param session_id: The session identifier.
:return: True if the Status Code is 200, otherwise False.
"""
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
return payload["status_code"] == 200
2024-01-10 18:04:48 +00:00
def apply_timestep(self, timestep: int) -> None:
"""
Apply a single timestep of simulation dynamics to this service.
Here at the first step, the database backup is created, in addition to normal service update logic.
"""
if timestep == 1:
self.backup_database()
return super().apply_timestep(timestep)
2024-01-11 15:40:37 +00:00
2024-03-26 10:51:33 +00:00
def _update_fix_status(self) -> None:
"""Perform a database restore when the FIXING countdown is finished."""
2024-03-26 10:51:33 +00:00
super()._update_fix_status()
if self._fixing_countdown is None:
2024-01-11 15:40:37 +00:00
self.restore_backup()