diff --git a/src/primaite/simulator/__init__.py b/src/primaite/simulator/__init__.py index 1cfe7f49..8c55542f 100644 --- a/src/primaite/simulator/__init__.py +++ b/src/primaite/simulator/__init__.py @@ -1,5 +1,14 @@ +from datetime import datetime + from primaite import _PRIMAITE_ROOT -TEMP_SIM_OUTPUT = _PRIMAITE_ROOT.parent.parent / "simulation_output" +SIM_OUTPUT = None "A path at the repo root dir to use temporarily for sim output testing while in dev." # TODO: Remove once we integrate the simulation into PrimAITE and it uses the primaite session path + +if not SIM_OUTPUT: + session_timestamp = datetime.now() + date_dir = session_timestamp.strftime("%Y-%m-%d") + sim_path = session_timestamp.strftime("%Y-%m-%d_%H-%M-%S") + SIM_OUTPUT = _PRIMAITE_ROOT.parent.parent / "simulation_output" / date_dir / sim_path + SIM_OUTPUT.mkdir(exist_ok=True, parents=True) diff --git a/src/primaite/simulator/_package_data/create-simulation_demo.ipynb b/src/primaite/simulator/_package_data/create-simulation_demo.ipynb index baf7bd2c..a2e1550c 100644 --- a/src/primaite/simulator/_package_data/create-simulation_demo.ipynb +++ b/src/primaite/simulator/_package_data/create-simulation_demo.ipynb @@ -149,8 +149,8 @@ "metadata": {}, "outputs": [], "source": [ - "from primaite.simulator.file_system.file_system_file_type import FileSystemFileType\n", - "from primaite.simulator.file_system.file_system_file import FileSystemFile" + "from primaite.simulator.file_system.file_type import FileType\n", + "from primaite.simulator.file_system.file_system import File" ] }, { @@ -160,7 +160,7 @@ "outputs": [], "source": [ "my_pc_downloads_folder = my_pc.file_system.create_folder(\"downloads\")\n", - "my_pc_downloads_folder.add_file(FileSystemFile(name=\"firefox_installer.zip\",file_type=FileSystemFileType.ZIP))" + "my_pc_downloads_folder.add_file(File(name=\"firefox_installer.zip\",file_type=FileType.ZIP))" ] }, { @@ -171,7 +171,7 @@ { "data": { "text/plain": [ - "FileSystemFile(uuid='7d56a563-ecc0-4011-8c97-240dd6c885c0', name='favicon.ico', size=40.0, file_type=, action_manager=None)" + "File(uuid='7d56a563-ecc0-4011-8c97-240dd6c885c0', name='favicon.ico', size=40.0, file_type=, action_manager=None)" ] }, "execution_count": 9, @@ -181,7 +181,7 @@ ], "source": [ "my_server_folder = my_server.file_system.create_folder(\"static\")\n", - "my_server.file_system.create_file(\"favicon.ico\", file_type=FileSystemFileType.PNG)" + "my_server.file_system.create_file(\"favicon.ico\", file_type=FileType.PNG)" ] }, { diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index 79159e60..b2037729 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -1,242 +1,519 @@ -from random import choice +from __future__ import annotations + +import math +import os.path +import shutil +from pathlib import Path from typing import Dict, Optional +from prettytable import MARKDOWN, PrettyTable + from primaite import getLogger from primaite.simulator.core import SimComponent -from primaite.simulator.file_system.file_system_file import FileSystemFile -from primaite.simulator.file_system.file_system_file_type import FileSystemFileType -from primaite.simulator.file_system.file_system_folder import FileSystemFolder +from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension +from primaite.simulator.system.core.sys_log import SysLog _LOGGER = getLogger(__name__) -class FileSystem(SimComponent): - """Class that contains all the simulation File System.""" +def convert_size(size_bytes: int) -> str: + """ + Convert a file size from bytes to a string with a more human-readable format. - folders: Dict[str, FileSystemFolder] = {} - """List containing all the folders in the file system.""" + This function takes the size of a file in bytes and converts it to a string representation with appropriate size + units (B, KB, MB, GB, etc.). + + :param size_bytes: The size of the file in bytes. + :return: The human-readable string representation of the file size. + """ + if size_bytes == 0: + return "0 B" + + # Tuple of size units starting from Bytes up to Yottabytes + size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + + # Calculate the index (i) that will be used to select the appropriate size unit from size_name + i = int(math.floor(math.log(size_bytes, 1024))) + + # Calculate the adjusted size value (s) in terms of the new size unit + p = math.pow(1024, i) + s = round(size_bytes / p, 2) + + return f"{s} {size_name[i]}" + + +class FileSystemItemABC(SimComponent): + """ + Abstract base class for file system items used in the file system simulation. + + :ivar name: The name of the FileSystemItemABC. + """ + + name: str + "The name of the FileSystemItemABC." def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. - Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation. - :return: Current state of this object and child objects. - :rtype: Dict """ state = super().describe_state() - state.update({"folders": {uuid: folder.describe_state() for uuid, folder in self.folders.items()}}) + state.update( + { + "name": self.name, + } + ) return state - def get_folders(self) -> Dict: - """Returns the list of folders.""" - return self.folders + @property + def size_str(self) -> str: + """ + Get the file size in a human-readable string format. + + This property makes use of the :func:`convert_size` function to convert the `self.size` attribute to a string + that is easier to read and understand. + + :return: The human-readable string representation of the file size. + """ + return convert_size(self.size) + + +class FileSystem(SimComponent): + """Class that contains all the simulation File System.""" + + folders: Dict[str, Folder] = {} + "List containing all the folders in the file system." + _folders_by_name: Dict[str, Folder] = {} + sys_log: SysLog + sim_root: Path + + def __init__(self, **kwargs): + super().__init__(**kwargs) + # Ensure a default root folder + if not self.folders: + self.create_folder("root") + + @property + def size(self) -> int: + """ + Calculate and return the total size of all folders in the file system. + + :return: The sum of the sizes of all folders in the file system. + """ + return sum(folder.size for folder in self.folders.values()) + + def show(self, markdown: bool = False, full: bool = False): + """ + Prints a table of the FileSystem, displaying either just folders or full files. + + :param markdown: Flag indicating if output should be in markdown format. + :param full: Flag indicating if to show full files. + """ + headers = ["Folder", "Size"] + if full: + headers[0] = "File Path" + table = PrettyTable(headers) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.sys_log.hostname} File System" + for folder in self.folders.values(): + if not full: + table.add_row([folder.name, folder.size_str]) + else: + for file in folder.files.values(): + table.add_row([file.path, file.size_str]) + if full: + print(table.get_string(sortby="File Path")) + else: + print(table.get_string(sortby="Folder")) + + def describe_state(self) -> Dict: + """ + Produce a dictionary describing the current state of this object. + + :return: Current state of this object and child objects. + """ + state = super().describe_state() + state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()} + return state + + def create_folder(self, folder_name: str) -> Folder: + """ + Creates a Folder and adds it to the list of folders. + + :param folder_name: The name of the folder. + """ + # check if folder with name already exists + if self.get_folder(folder_name): + raise Exception(f"Cannot create folder as it already exists: {folder_name}") + + folder = Folder(name=folder_name, fs=self) + + self.folders[folder.uuid] = folder + self._folders_by_name[folder.name] = folder + self.sys_log.info(f"Created folder /{folder.name}") + return folder + + def delete_folder(self, folder_name: str): + """ + Deletes a folder, removes it from the folders list and removes any child folders and files. + + :param folder_name: The name of the folder. + """ + if folder_name == "root": + self.sys_log.warning("Cannot delete the root folder.") + return + folder = self._folders_by_name.get(folder_name) + if folder: + for file in folder.files.values(): + self.delete_file(file) + self.folders.pop(folder.uuid) + self._folders_by_name.pop(folder.name) + self.sys_log.info(f"Deleted folder /{folder.name} and its contents") + else: + _LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}") def create_file( self, file_name: str, - size: Optional[float] = None, - file_type: Optional[FileSystemFileType] = None, - folder: Optional[FileSystemFolder] = None, - folder_uuid: Optional[str] = None, - ) -> FileSystemFile: + size: Optional[int] = None, + file_type: Optional[FileType] = None, + folder_name: Optional[str] = None, + real: bool = False, + ) -> File: """ - Creates a FileSystemFile and adds it to the list of files. + Creates a File and adds it to the list of files. - If no size or file_type are provided, one will be chosen randomly. - If no folder_uuid or folder is provided, a new folder will be created. - - :param: file_name: The file name - :type: file_name: str - - :param: size: The size the file takes on disk. - :type: size: Optional[float] - - :param: file_type: The type of the file - :type: Optional[FileSystemFileType] - - :param: folder: The folder to add the file to - :type: folder: Optional[FileSystemFolder] - - :param: folder_uuid: The uuid of the folder to add the file to - :type: folder_uuid: Optional[str] + :param file_name: The file name. + :param size: The size the file takes on disk in bytes. + :param file_type: The type of the file. + :param folder_name: The folder to add the file to. + :param real: "Indicates whether the File is actually a real file in the Node sim fs output." """ - file = None - folder = None - - if file_type is None: - file_type = self.get_random_file_type() - - # if no folder uuid provided, create a folder and add file to it - if folder_uuid is not None: - # otherwise check for existence and add file - folder = self.get_folder_by_id(folder_uuid) - - if folder is not None: + if folder_name: # check if file with name already exists - if folder.get_file_by_name(file_name): - raise Exception(f'File with name "{file_name}" already exists.') - - file = FileSystemFile(name=file_name, size=size, file_type=file_type) - folder.add_file(file=file) + folder = self._folders_by_name.get(folder_name) + # If not then create it + if not folder: + folder = self.create_folder(folder_name) else: - # check if a "root" folder exists - folder = self.get_folder_by_name("root") - if folder is None: - # create a root folder - folder = FileSystemFolder(name="root") + # Use root folder if folder_name not supplied + folder = self._folders_by_name["root"] - # add file to root folder - file = FileSystemFile(name=file_name, size=size, file_type=file_type) - folder.add_file(file) - self.folders[folder.uuid] = folder + # Create the file and add it to the folder + file = File( + name=file_name, + sim_size=size, + file_type=file_type, + folder=folder, + real=real, + sim_path=self.sim_root if real else None, + ) + folder.add_file(file) + self.sys_log.info(f"Created file /{file.path}") return file - def create_folder( - self, - folder_name: str, - ) -> FileSystemFolder: + def get_file(self, folder_name: str, file_name: str) -> Optional[File]: """ - Creates a FileSystemFolder and adds it to the list of folders. + Retrieve a file by its name from a specific folder. - :param: folder_name: The name of the folder - :type: folder_name: str + :param folder_name: The name of the folder where the file resides. + :param file_name: The name of the file to be retrieved, including its extension. + :return: An instance of File if it exists, otherwise `None`. """ - # check if folder with name already exists - if self.get_folder_by_name(folder_name): - raise Exception(f'Folder with name "{folder_name}" already exists.') + folder = self.get_folder(folder_name) + if folder: + return folder.get_file(file_name) + self.fs.sys_log.info(f"file not found /{folder_name}/{file_name}") - folder = FileSystemFolder(name=folder_name) - - self.folders[folder.uuid] = folder - return folder - - def delete_file(self, file: Optional[FileSystemFile] = None): + def delete_file(self, folder_name: str, file_name: str): """ - Deletes a file and removes it from the files list. + Delete a file by its name from a specific folder. - :param file: The file to delete - :type file: Optional[FileSystemFile] + :param folder_name: The name of the folder containing the file. + :param file_name: The name of the file to be deleted, including its extension. """ - # iterate through folders to delete the item with the matching uuid - for key in self.folders: - self.get_folder_by_id(key).remove_file(file) + folder = self.get_folder(folder_name) + if folder: + file = folder.get_file(file_name) + if file: + folder.remove_file(file) + self.sys_log.info(f"Deleted file /{file.path}") - def delete_folder(self, folder: FileSystemFolder): + def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str): """ - Deletes a folder, removes it from the folders list and removes any child folders and files. + Move a file from one folder to another. - :param folder: The folder to remove - :type folder: FileSystemFolder + :param src_folder_name: The name of the source folder containing the file. + :param src_file_name: The name of the file to be moved. + :param dst_folder_name: The name of the destination folder. """ - if folder is None or not isinstance(folder, FileSystemFolder): - raise Exception(f"Invalid folder: {folder}") + file = self.get_file(folder_name=src_folder_name, file_name=src_file_name) + if file: + src_folder = file.folder - if self.folders.get(folder.uuid): - del self.folders[folder.uuid] + # remove file from src + src_folder.remove_file(file) + dst_folder = self.get_folder(folder_name=dst_folder_name) + if not dst_folder: + dst_folder = self.create_folder(dst_folder_name) + # add file to dst + dst_folder.add_file(file) + if file.real: + old_sim_path = file.sim_path + file.sim_path = file.folder.fs.sim_root / file.path + file.sim_path.parent.mkdir(exist_ok=True) + shutil.move(old_sim_path, file.sim_path) + + def copy_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str): + """ + Copy a file from one folder to another. + + :param src_folder_name: The name of the source folder containing the file. + :param src_file_name: The name of the file to be copied. + :param dst_folder_name: The name of the destination folder. + """ + file = self.get_file(folder_name=src_folder_name, file_name=src_file_name) + if file: + dst_folder = self.get_folder(folder_name=dst_folder_name) + if not dst_folder: + dst_folder = self.create_folder(dst_folder_name) + new_file = file.make_copy(dst_folder=dst_folder) + dst_folder.add_file(new_file) + if file.real: + new_file.sim_path.parent.mkdir(exist_ok=True) + shutil.copy2(file.sim_path, new_file.sim_path) + + def get_folder(self, folder_name: str) -> Optional[Folder]: + """ + Get a folder by its name if it exists. + + :param folder_name: The folder name. + :return: The matching Folder. + """ + return self._folders_by_name.get(folder_name) + + def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]: + """ + Get a folder by its uuid if it exists. + + :param folder_uuid: The folder uuid. + :return: The matching Folder. + """ + return self.folders.get(folder_uuid) + + +class Folder(FileSystemItemABC): + """Simulation Folder.""" + + fs: FileSystem + "The FileSystem the Folder is in." + files: Dict[str, File] = {} + "Files stored in the folder." + _files_by_name: Dict[str, File] = {} + "Files by their name as .." + is_quarantined: bool = False + "Flag that marks the folder as quarantined if true." + + def describe_state(self) -> Dict: + """ + Produce a dictionary describing the current state of this object. + + :return: Current state of this object and child objects. + """ + state = super().describe_state() + state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()} + state["is_quarantined"] = self.is_quarantined + return state + + def show(self, markdown: bool = False): + """ + Display the contents of the Folder in tabular format. + + :param markdown: Whether to display the table in Markdown format or not. Default is `False`. + """ + table = PrettyTable(["File", "Size"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.fs.sys_log.hostname} File System Folder ({self.name})" + for file in self.files.values(): + table.add_row([file.name, file.size_str]) + print(table.get_string(sortby="File")) + + @property + def size(self) -> int: + """ + Calculate and return the total size of all files in the folder. + + :return: The total size of all files in the folder. If no files exist or all have `None` + size, returns 0. + """ + return sum(file.size for file in self.files.values() if file.size is not None) + + def get_file(self, file_name: str) -> Optional[File]: + """ + Get a file by its name. + + File name must be the filename and prefix, like 'memo.docx'. + + :param file_name: The file name. + :return: The matching File. + """ + # TODO: Increment read count? + return self._files_by_name.get(file_name) + + def get_file_by_id(self, file_uuid: str) -> File: + """ + Get a file by its uuid. + + :param file_uuid: The file uuid. + :return: The matching File. + """ + return self.files.get(file_uuid) + + def add_file(self, file: File): + """ + Adds a file to the folder. + + :param File file: The File object to be added to the folder. + :raises Exception: If the provided `file` parameter is None or not an instance of the + `File` class. + """ + if file is None or not isinstance(file, File): + raise Exception(f"Invalid file: {file}") + + # check if file with id already exists in folder + if file.uuid in self.files: + _LOGGER.debug(f"File with id {file.uuid} already exists in folder") else: - _LOGGER.debug(f"File with UUID {folder.uuid} was not found.") + # add to list + self.files[file.uuid] = file + self._files_by_name[file.name] = file + file.folder = self - def move_file(self, file: FileSystemFile, src_folder: FileSystemFolder, target_folder: FileSystemFolder): + def remove_file(self, file: Optional[File]): """ - Moves a file from one folder to another. + Removes a file from the folder list. - can provide + The method can take a File object or a file id. - :param: file: The file to move - :type: file: FileSystemFile - - :param: src_folder: The folder where the file is located - :type: FileSystemFolder - - :param: target_folder: The folder where the file should be moved to - :type: FileSystemFolder + :param file: The file to remove """ - # check that the folders exist - if src_folder is None: - raise Exception("Source folder not provided") + if file is None or not isinstance(file, File): + raise Exception(f"Invalid file: {file}") - if target_folder is None: - raise Exception("Target folder not provided") + if self.files.get(file.uuid): + self.files.pop(file.uuid) + self._files_by_name.pop(file.name) + else: + _LOGGER.debug(f"File with UUID {file.uuid} was not found.") - if file is None: - raise Exception("File to be moved is None") + def quarantine(self): + """Quarantines the File System Folder.""" + if not self.is_quarantined: + self.is_quarantined = True + self.fs.sys_log.info(f"Quarantined folder ./{self.name}") - # check if file with name already exists - if target_folder.get_file_by_name(file.name): - raise Exception(f'Folder with name "{file.name}" already exists.') + def unquarantine(self): + """Unquarantine of the File System Folder.""" + if self.is_quarantined: + self.is_quarantined = False + self.fs.sys_log.info(f"Quarantined folder ./{self.name}") - # remove file from src - src_folder.remove_file(file) + def quarantine_status(self) -> bool: + """Returns true if the folder is being quarantined.""" + return self.is_quarantined - # add file to target - target_folder.add_file(file) - def copy_file(self, file: FileSystemFile, src_folder: FileSystemFolder, target_folder: FileSystemFolder): +class File(FileSystemItemABC): + """ + Class representing a file in the simulation. + + :ivar Folder folder: The folder in which the file resides. + :ivar FileType file_type: The type of the file. + :ivar Optional[int] sim_size: The simulated file size. + :ivar bool real: Indicates if the file is actually a real file in the Node sim fs output. + :ivar Optional[Path] sim_path: The path if the file is real. + """ + + folder: Folder + "The Folder the File is in." + file_type: FileType + "The type of File." + sim_size: Optional[int] = None + "The simulated file size." + real: bool = False + "Indicates whether the File is actually a real file in the Node sim fs output." + sim_path: Optional[Path] = None + "The Path if real is True." + + def __init__(self, **kwargs): """ - Copies a file from one folder to another. + Initialise File class. - can provide - - :param: file: The file to move - :type: file: FileSystemFile - - :param: src_folder: The folder where the file is located - :type: FileSystemFolder - - :param: target_folder: The folder where the file should be moved to - :type: FileSystemFolder + :param name: The name of the file. + :param file_type: The FileType of the file + :param size: The size of the FileSystemItemABC """ - if src_folder is None: - raise Exception("Source folder not provided") + has_extension = "." in kwargs["name"] - if target_folder is None: - raise Exception("Target folder not provided") + # Attempt to use the file type extension to set/override the FileType + if has_extension: + extension = kwargs["name"].split(".")[-1] + kwargs["file_type"] = get_file_type_from_extension(extension) + else: + # If the file name does not have a extension, override file type to FileType.UNKNOWN + if not kwargs["file_type"]: + kwargs["file_type"] = FileType.UNKNOWN + if kwargs["file_type"] != FileType.UNKNOWN: + kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}" - if file is None: - raise Exception("File to be moved is None") + # set random file size if none provided + if not kwargs.get("sim_size"): + kwargs["sim_size"] = kwargs["file_type"].default_size + super().__init__(**kwargs) + if self.real: + self.sim_path = self.folder.fs.sim_root / self.path + if not self.sim_path.exists(): + self.sim_path.parent.mkdir(exist_ok=True, parents=True) + with open(self.sim_path, mode="a"): + pass - # check if file with name already exists - if target_folder.get_file_by_name(file.name): - raise Exception(f'Folder with name "{file.name}" already exists.') - - # add file to target - target_folder.add_file(file) - - def get_file_by_id(self, file_id: str) -> FileSystemFile: - """Checks if the file exists in any file system folders.""" - for key in self.folders: - file = self.folders[key].get_file_by_id(file_id=file_id) - if file is not None: - return file - - def get_folder_by_name(self, folder_name: str) -> Optional[FileSystemFolder]: + def make_copy(self, dst_folder: Folder) -> File: """ - Returns a the first folder with a matching name. + Create a copy of the current File object in the given destination folder. - :return: Returns the first FileSydtemFolder with a matching name + :param Folder dst_folder: The destination folder for the copied file. + :return: A new File object that is a copy of the current file. """ - matching_folder = None - for key in self.folders: - if self.folders[key].name == folder_name: - matching_folder = self.folders[key] - break - return matching_folder + return File(folder=dst_folder, **self.model_dump(exclude={"uuid", "folder", "sim_path"})) - def get_folder_by_id(self, folder_id: str) -> FileSystemFolder: + @property + def path(self) -> str: """ - Checks if the folder exists. + Get the path of the file in the file system. - :param: folder_id: The id of the folder to find - :type: folder_id: str + :return: The full path of the file. """ - return self.folders[folder_id] + return f"{self.folder.name}/{self.name}" - def get_random_file_type(self) -> FileSystemFileType: + @property + def size(self) -> int: """ - Returns a random FileSystemFileTypeEnum. + Get the size of the file in bytes. - :return: A random file type Enum + :return: The size of the file in bytes. """ - return choice(list(FileSystemFileType)) + if self.real: + return os.path.getsize(self.sim_path) + return self.sim_size + + def describe_state(self) -> Dict: + """Produce a dictionary describing the current state of this object.""" + state = super().describe_state() + state["size"] = self.size + state["file_type"] = self.file_type.name + return state diff --git a/src/primaite/simulator/file_system/file_system_file.py b/src/primaite/simulator/file_system/file_system_file.py deleted file mode 100644 index c25f5973..00000000 --- a/src/primaite/simulator/file_system/file_system_file.py +++ /dev/null @@ -1,55 +0,0 @@ -from random import choice -from typing import Dict - -from primaite.simulator.file_system.file_system_file_type import file_type_sizes_KB, FileSystemFileType -from primaite.simulator.file_system.file_system_item_abc import FileSystemItem - - -class FileSystemFile(FileSystemItem): - """Class that represents a file in the simulation.""" - - file_type: FileSystemFileType = None - """The type of the FileSystemFile""" - - def __init__(self, **kwargs): - """ - Initialise FileSystemFile class. - - :param name: The name of the file. - :type name: str - - :param file_type: The FileSystemFileType of the file - :type file_type: Optional[FileSystemFileType] - - :param size: The size of the FileSystemItem - :type size: Optional[float] - """ - # set random file type if none provided - - # set random file type if none provided - if kwargs.get("file_type") is None: - kwargs["file_type"] = choice(list(FileSystemFileType)) - - # set random file size if none provided - if kwargs.get("size") is None: - kwargs["size"] = file_type_sizes_KB[kwargs["file_type"]] - - super().__init__(**kwargs) - - def describe_state(self) -> Dict: - """ - Produce a dictionary describing the current state of this object. - - Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation. - - :return: Current state of this object and child objects. - :rtype: Dict - """ - state = super().describe_state() - state.update( - { - "uuid": self.uuid, - "file_type": self.file_type.name, - } - ) - return state diff --git a/src/primaite/simulator/file_system/file_system_file_type.py b/src/primaite/simulator/file_system/file_system_file_type.py deleted file mode 100644 index 88aeb430..00000000 --- a/src/primaite/simulator/file_system/file_system_file_type.py +++ /dev/null @@ -1,132 +0,0 @@ -from enum import Enum - - -class FileSystemFileType(str, Enum): - """An enumeration of common file types.""" - - UNKNOWN = 0 - "Unknown file type." - - # Text formats - TXT = 1 - "Plain text file." - DOC = 2 - "Microsoft Word document (.doc)" - DOCX = 3 - "Microsoft Word document (.docx)" - PDF = 4 - "Portable Document Format." - HTML = 5 - "HyperText Markup Language file." - XML = 6 - "Extensible Markup Language file." - CSV = 7 - "Comma-Separated Values file." - - # Spreadsheet formats - XLS = 8 - "Microsoft Excel file (.xls)" - XLSX = 9 - "Microsoft Excel file (.xlsx)" - - # Image formats - JPEG = 10 - "JPEG image file." - PNG = 11 - "PNG image file." - GIF = 12 - "GIF image file." - BMP = 13 - "Bitmap image file." - - # Audio formats - MP3 = 14 - "MP3 audio file." - WAV = 15 - "WAV audio file." - - # Video formats - MP4 = 16 - "MP4 video file." - AVI = 17 - "AVI video file." - MKV = 18 - "MKV video file." - FLV = 19 - "FLV video file." - - # Presentation formats - PPT = 20 - "Microsoft PowerPoint file (.ppt)" - PPTX = 21 - "Microsoft PowerPoint file (.pptx)" - - # Web formats - JS = 22 - "JavaScript file." - CSS = 23 - "Cascading Style Sheets file." - - # Programming languages - PY = 24 - "Python script file." - C = 25 - "C source code file." - CPP = 26 - "C++ source code file." - JAVA = 27 - "Java source code file." - - # Compressed file types - RAR = 28 - "RAR archive file." - ZIP = 29 - "ZIP archive file." - TAR = 30 - "TAR archive file." - GZ = 31 - "Gzip compressed file." - - # Database file types - MDF = 32 - "MS SQL Server primary database file" - NDF = 33 - "MS SQL Server secondary database file" - LDF = 34 - "MS SQL Server transaction log" - - -file_type_sizes_KB = { - FileSystemFileType.UNKNOWN: 0, - FileSystemFileType.TXT: 4, - FileSystemFileType.DOC: 50, - FileSystemFileType.DOCX: 30, - FileSystemFileType.PDF: 100, - FileSystemFileType.HTML: 15, - FileSystemFileType.XML: 10, - FileSystemFileType.CSV: 15, - FileSystemFileType.XLS: 100, - FileSystemFileType.XLSX: 25, - FileSystemFileType.JPEG: 100, - FileSystemFileType.PNG: 40, - FileSystemFileType.GIF: 30, - FileSystemFileType.BMP: 300, - FileSystemFileType.MP3: 5000, - FileSystemFileType.WAV: 25000, - FileSystemFileType.MP4: 25000, - FileSystemFileType.AVI: 50000, - FileSystemFileType.MKV: 50000, - FileSystemFileType.FLV: 15000, - FileSystemFileType.PPT: 200, - FileSystemFileType.PPTX: 100, - FileSystemFileType.JS: 10, - FileSystemFileType.CSS: 5, - FileSystemFileType.PY: 5, - FileSystemFileType.C: 5, - FileSystemFileType.CPP: 10, - FileSystemFileType.JAVA: 10, - FileSystemFileType.RAR: 1000, - FileSystemFileType.ZIP: 1000, - FileSystemFileType.TAR: 1000, - FileSystemFileType.GZ: 800, -} diff --git a/src/primaite/simulator/file_system/file_system_folder.py b/src/primaite/simulator/file_system/file_system_folder.py deleted file mode 100644 index 4e461a3a..00000000 --- a/src/primaite/simulator/file_system/file_system_folder.py +++ /dev/null @@ -1,87 +0,0 @@ -from typing import Dict, Optional - -from primaite import getLogger -from primaite.simulator.file_system.file_system_file import FileSystemFile -from primaite.simulator.file_system.file_system_item_abc import FileSystemItem - -_LOGGER = getLogger(__name__) - - -class FileSystemFolder(FileSystemItem): - """Simulation FileSystemFolder.""" - - files: Dict[str, FileSystemFile] = {} - """List of files stored in the folder.""" - - is_quarantined: bool = False - """Flag that marks the folder as quarantined if true.""" - - def describe_state(self) -> Dict: - """ - Produce a dictionary describing the current state of this object. - - Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation. - - :return: Current state of this object and child objects. - :rtype: Dict - """ - state = super().describe_state() - state.update( - { - "files": {uuid: file.describe_state() for uuid, file in self.files.items()}, - "is_quarantined": self.is_quarantined, - } - ) - return state - - def get_file_by_id(self, file_id: str) -> FileSystemFile: - """Return a FileSystemFile with the matching id.""" - return self.files.get(file_id) - - def get_file_by_name(self, file_name: str) -> FileSystemFile: - """Return a FileSystemFile with the matching id.""" - return next((f for f in list(self.files) if f.name == file_name), None) - - def add_file(self, file: FileSystemFile): - """Adds a file to the folder list.""" - if file is None or not isinstance(file, FileSystemFile): - raise Exception(f"Invalid file: {file}") - - # check if file with id already exists in folder - if file.uuid in self.files: - _LOGGER.debug(f"File with id {file.uuid} already exists in folder") - else: - # add to list - self.files[file.uuid] = file - self.size += file.size - - def remove_file(self, file: Optional[FileSystemFile]): - """ - Removes a file from the folder list. - - The method can take a FileSystemFile object or a file id. - - :param: file: The file to remove - :type: Optional[FileSystemFile] - """ - if file is None or not isinstance(file, FileSystemFile): - raise Exception(f"Invalid file: {file}") - - if self.files.get(file.uuid): - del self.files[file.uuid] - - self.size -= file.size - else: - _LOGGER.debug(f"File with UUID {file.uuid} was not found.") - - def quarantine(self): - """Quarantines the File System Folder.""" - self.is_quarantined = True - - def end_quarantine(self): - """Ends the quarantine of the File System Folder.""" - self.is_quarantined = False - - def quarantine_status(self) -> bool: - """Returns true if the folder is being quarantined.""" - return self.is_quarantined diff --git a/src/primaite/simulator/file_system/file_system_item_abc.py b/src/primaite/simulator/file_system/file_system_item_abc.py deleted file mode 100644 index 3b368819..00000000 --- a/src/primaite/simulator/file_system/file_system_item_abc.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import Dict - -from primaite.simulator.core import SimComponent - - -class FileSystemItem(SimComponent): - """Abstract base class for FileSystemItems used in the file system simulation.""" - - name: str - """The name of the FileSystemItem.""" - - size: float = 0 - """The size the item takes up on disk.""" - - def describe_state(self) -> Dict: - """ - Produce a dictionary describing the current state of this object. - - Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation. - - :return: Current state of this object and child objects. - :rtype: Dict - """ - state = super().describe_state() - state.update( - { - "name": self.name, - "size": self.size, - } - ) - return state diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 101d6b72..fa1058cf 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -4,12 +4,14 @@ import re import secrets from enum import Enum from ipaddress import IPv4Address, IPv4Network +from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from prettytable import MARKDOWN, PrettyTable from primaite import getLogger from primaite.exceptions import NetworkError +from primaite.simulator import SIM_OUTPUT from primaite.simulator.core import SimComponent from primaite.simulator.domain.account import Account from primaite.simulator.file_system.file_system import FileSystem @@ -890,6 +892,8 @@ class Node(SimComponent): "All processes on the node." file_system: FileSystem "The nodes file system." + root: Path + "Root directory for simulation output." sys_log: SysLog arp: ARPCache icmp: ICMP @@ -917,14 +921,19 @@ class Node(SimComponent): kwargs["icmp"] = ICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp")) if not kwargs.get("session_manager"): kwargs["session_manager"] = SessionManager(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp")) + if not kwargs.get("root"): + kwargs["root"] = SIM_OUTPUT / kwargs["hostname"] + if not kwargs.get("file_system"): + kwargs["file_system"] = FileSystem(sys_log=kwargs["sys_log"], sim_root=kwargs["root"] / "fs") if not kwargs.get("software_manager"): kwargs["software_manager"] = SoftwareManager( - sys_log=kwargs.get("sys_log"), session_manager=kwargs.get("session_manager") + sys_log=kwargs.get("sys_log"), + session_manager=kwargs.get("session_manager"), + file_system=kwargs.get("file_system"), ) - if not kwargs.get("file_system"): - kwargs["file_system"] = FileSystem() super().__init__(**kwargs) self.arp.nics = self.nics + self.session_manager.software_manager = self.software_manager def describe_state(self) -> Dict: """ @@ -1091,6 +1100,8 @@ class Node(SimComponent): if frame.ip.protocol == IPProtocol.TCP: if frame.tcp.src_port == Port.ARP: self.arp.process_arp_packet(from_nic=from_nic, arp_packet=frame.arp) + else: + self.session_manager.receive_frame(frame) elif frame.ip.protocol == IPProtocol.UDP: pass elif frame.ip.protocol == IPProtocol.ICMP: diff --git a/src/primaite/simulator/network/networks.py b/src/primaite/simulator/network/networks.py index 6a50fe3f..c030d907 100644 --- a/src/primaite/simulator/network/networks.py +++ b/src/primaite/simulator/network/networks.py @@ -6,6 +6,7 @@ from primaite.simulator.network.hardware.nodes.server import Server from primaite.simulator.network.hardware.nodes.switch import Switch from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.database import DatabaseService def client_server_routed() -> Network: @@ -160,6 +161,39 @@ def arcd_uc2_network() -> Network: database_server.power_on() network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3]) + ddl = """ + CREATE TABLE IF NOT EXISTS user ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name VARCHAR(50) NOT NULL, + email VARCHAR(50) NOT NULL, + age INT, + city VARCHAR(50), + occupation VARCHAR(50) + );""" + + user_insert_statements = [ + "INSERT INTO user (name, email, age, city, occupation) VALUES ('John Doe', 'johndoe@example.com', 32, 'New York', 'Engineer');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Jane Smith', 'janesmith@example.com', 27, 'Los Angeles', 'Designer');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Bob Johnson', 'bobjohnson@example.com', 45, 'Chicago', 'Manager');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Alice Lee', 'alicelee@example.com', 22, 'San Francisco', 'Student');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('David Kim', 'davidkim@example.com', 38, 'Houston', 'Consultant');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Emily Chen', 'emilychen@example.com', 29, 'Seattle', 'Software Developer');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Frank Wang', 'frankwang@example.com', 55, 'New York', 'Entrepreneur');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Grace Park', 'gracepark@example.com', 31, 'Los Angeles', 'Marketing Specialist');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Henry Wu', 'henrywu@example.com', 40, 'Chicago', 'Accountant');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Isabella Kim', 'isabellakim@example.com', 26, 'San Francisco', 'Graphic Designer');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Jake Lee', 'jakelee@example.com', 33, 'Houston', 'Sales Manager');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Kelly Chen', 'kellychen@example.com', 28, 'Seattle', 'Web Developer');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Lucas Liu', 'lucasliu@example.com', 42, 'New York', 'Lawyer');", # noqa + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Maggie Wang', 'maggiewang@example.com', 30, 'Los Angeles', 'Data Analyst');", # noqa + ] + database_server.software_manager.add_service(DatabaseService) + database: DatabaseService = database_server.software_manager.services["Database"] # noqa + database.start() + database._process_sql(ddl) # noqa + for insert_statement in user_insert_statements: + database._process_sql(insert_statement) # noqa + # Backup Server backup_server = Server( hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1" @@ -183,4 +217,6 @@ def arcd_uc2_network() -> Network: router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER) + return network diff --git a/src/primaite/simulator/system/core/packet_capture.py b/src/primaite/simulator/system/core/packet_capture.py index c985af1f..2e5ed008 100644 --- a/src/primaite/simulator/system/core/packet_capture.py +++ b/src/primaite/simulator/system/core/packet_capture.py @@ -1,8 +1,9 @@ +import json import logging from pathlib import Path -from typing import Optional +from typing import Any, Dict, List, Optional -from primaite.simulator import TEMP_SIM_OUTPUT +from primaite.simulator import SIM_OUTPUT class _JSONFilter(logging.Filter): @@ -51,6 +52,18 @@ class PacketCapture: self.logger.addFilter(_JSONFilter()) + def read(self) -> List[Dict[str, Any]]: + """ + Read packet capture logs and return them as a list of dictionaries. + + :return: List of frames captured, represented as dictionaries. + """ + frames = [] + with open(self._get_log_path(), "r") as file: + while line := file.readline(): + frames.append(json.loads(line.rstrip())) + return frames + @property def _logger_name(self) -> str: """Get PCAP the logger name.""" @@ -62,7 +75,7 @@ class PacketCapture: def _get_log_path(self) -> Path: """Get the path for the log file.""" - root = TEMP_SIM_OUTPUT / self.hostname + root = SIM_OUTPUT / self.hostname root.mkdir(exist_ok=True, parents=True) return root / f"{self._logger_name}.log" diff --git a/src/primaite/simulator/system/core/software_manager.py b/src/primaite/simulator/system/core/software_manager.py index 71519ac7..ea011f49 100644 --- a/src/primaite/simulator/system/core/software_manager.py +++ b/src/primaite/simulator/system/core/software_manager.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING, Union from prettytable import MARKDOWN, PrettyTable +from primaite.simulator.file_system.file_system import FileSystem from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import Application @@ -22,7 +23,7 @@ ServiceClass = TypeVar("ServiceClass", bound=Service) class SoftwareManager: """A class that manages all running Services and Applications on a Node and facilitates their communication.""" - def __init__(self, session_manager: "SessionManager", sys_log: "SysLog"): + def __init__(self, session_manager: "SessionManager", sys_log: SysLog, file_system: FileSystem): """ Initialize a new instance of SoftwareManager. @@ -33,6 +34,7 @@ class SoftwareManager: self.applications: Dict[str, Application] = {} self.port_protocol_mapping: Dict[Tuple[Port, IPProtocol], Union[Service, Application]] = {} self.sys_log: SysLog = sys_log + self.file_system: FileSystem = file_system def add_service(self, service_class: Type[ServiceClass]): """ @@ -40,7 +42,7 @@ class SoftwareManager: :param: service_class: The class of the service to add """ - service = service_class(software_manager=self, sys_log=self.sys_log) + service = service_class(software_manager=self, sys_log=self.sys_log, file_system=self.file_system) service.software_manager = self self.services[service.name] = service @@ -108,9 +110,10 @@ class SoftwareManager: """ receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None) if receiver: - receiver.receive_payload(payload=payload, session_id=session_id) + receiver.receive(payload=payload, session_id=session_id) else: self.sys_log.error(f"No service or application found for port {port} and protocol {protocol}") + pass def show(self, markdown: bool = False): """ diff --git a/src/primaite/simulator/system/core/sys_log.py b/src/primaite/simulator/system/core/sys_log.py index e07c28aa..791e0be8 100644 --- a/src/primaite/simulator/system/core/sys_log.py +++ b/src/primaite/simulator/system/core/sys_log.py @@ -3,7 +3,7 @@ from pathlib import Path from prettytable import MARKDOWN, PrettyTable -from primaite.simulator import TEMP_SIM_OUTPUT +from primaite.simulator import SIM_OUTPUT class _NotJSONFilter(logging.Filter): @@ -81,7 +81,7 @@ class SysLog: :return: Path object representing the location of the log file. """ - root = TEMP_SIM_OUTPUT / self.hostname + root = SIM_OUTPUT / self.hostname root.mkdir(exist_ok=True, parents=True) return root / f"{self.hostname}_sys.log" diff --git a/src/primaite/simulator/system/services/database.py b/src/primaite/simulator/system/services/database.py index 23b856f7..dc148031 100644 --- a/src/primaite/simulator/system/services/database.py +++ b/src/primaite/simulator/system/services/database.py @@ -1,12 +1,82 @@ -from typing import Dict +import sqlite3 +from ipaddress import IPv4Address +from sqlite3 import OperationalError +from typing import Any, Dict, List, Optional, Union -from primaite.simulator.file_system.file_system_file_type import FileSystemFileType -from primaite.simulator.network.hardware.base import Node +from prettytable import MARKDOWN, PrettyTable + +from primaite.simulator.file_system.file_system import File +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.core.software_manager import SoftwareManager from primaite.simulator.system.services.service import Service class DatabaseService(Service): - """Service loosely modelled on Microsoft SQL Server.""" + """ + A class for simulating a generic SQL Server service. + + This class inherits from the `Service` class and provides methods to manage and query a SQLite database. + """ + + backup_server: Optional[IPv4Address] = None + "The IP Address of the server the " + + def __init__(self, **kwargs): + kwargs["name"] = "Database" + kwargs["port"] = Port.POSTGRES_SERVER + kwargs["protocol"] = IPProtocol.TCP + super().__init__(**kwargs) + self._db_file: File + self._create_db_file() + self._conn = sqlite3.connect(self._db_file.sim_path) + self._cursor = self._conn.cursor() + + def tables(self) -> List[str]: + """ + Get a list of table names present in the database. + + :return: List of table names. + """ + sql = "SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence';" + results = self._process_sql(sql) + return [row[0] for row in results["data"]] + + def show(self, markdown: bool = False): + """ + Prints a list of table names in the database using PrettyTable. + + :param markdown: Whether to output the table in Markdown format. + """ + table = PrettyTable(["Table"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.file_system.sys_log.hostname} Database" + for row in self.tables(): + table.add_row([row]) + print(table) + + def _create_db_file(self): + """Creates the Simulation File and sqlite file in the file system.""" + self._db_file: File = self.file_system.create_file(folder_name="database", file_name="database.db", real=True) + self.folder = self._db_file.folder + + def _process_sql(self, query: str) -> Dict[str, Union[int, List[Any]]]: + """ + Executes the given SQL query and returns the result. + + :param query: The SQL query to be executed. + :return: Dictionary containing status code and data fetched. + """ + try: + self._cursor.execute(query) + self._conn.commit() + except OperationalError: + # Handle the case where the table does not exist. + return {"status_code": 404, "data": []} + + return {"status_code": 200, "data": self._cursor.fetchall()} def describe_state(self) -> Dict: """ @@ -19,58 +89,28 @@ class DatabaseService(Service): """ return super().describe_state() - def uninstall(self) -> None: + def receive(self, payload: Any, session_id: str, **kwargs) -> bool: """ - Undo installation procedure. + Processes the incoming SQL payload and sends the result back. - This method deletes files created when installing the database, and the database folder if it is empty. + :param payload: The SQL query to be executed. + :param session_id: The session identifier. + :return: True if the Status Code is 200, otherwise False. """ - super().uninstall() - node: Node = self.parent - node.file_system.delete_file(self.primary_store) - node.file_system.delete_file(self.transaction_log) - if self.secondary_store: - node.file_system.delete_file(self.secondary_store) - if len(self.folder.files) == 0: - node.file_system.delete_folder(self.folder) + result = self._process_sql(payload) + self.send(payload=result, session_id=session_id) - def install(self) -> None: - """Perform first time install on a node, creating necessary files.""" - super().install() - assert isinstance(self.parent, Node), "Database install can only happen after the db service is added to a node" - self._setup_files() + return payload["status_code"] == 200 - def _setup_files( - self, - db_size: int = 1000, - use_secondary_db_file: bool = False, - secondary_db_size: int = 300, - folder_name: str = "database", - ): - """Set up files that are required by the database on the parent host. - - :param db_size: Initial file size of the main database file, defaults to 1000 - :type db_size: int, optional - :param use_secondary_db_file: Whether to use a secondary database file, defaults to False - :type use_secondary_db_file: bool, optional - :param secondary_db_size: Size of the secondary db file, defaults to None - :type secondary_db_size: int, optional - :param folder_name: Name of the folder which will be setup to hold the db files, defaults to "database" - :type folder_name: str, optional + def send(self, payload: Any, session_id: str, **kwargs) -> bool: """ - # note that this parent.file_system.create_folder call in the future will be authenticated by using permissions - # handler. This permission will be granted based on service account given to the database service. - self.parent: Node - self.folder = self.parent.file_system.create_folder(folder_name) - self.primary_store = self.parent.file_system.create_file( - "db_primary_store", db_size, FileSystemFileType.MDF, folder=self.folder - ) - self.transaction_log = self.parent.file_system.create_file( - "db_transaction_log", "1", FileSystemFileType.LDF, folder=self.folder - ) - if use_secondary_db_file: - self.secondary_store = self.parent.file_system.create_file( - "db_secondary_store", secondary_db_size, FileSystemFileType.NDF, folder=self.folder - ) - else: - self.secondary_store = None + Send a SQL response back down to the SessionManager. + + :param payload: The SQL query results. + :param session_id: The session identifier. + :return: True if the Status Code is 200, otherwise False. + """ + software_manager: SoftwareManager = self.software_manager + software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id) + + return payload["status_code"] == 200 diff --git a/src/primaite/simulator/system/software.py b/src/primaite/simulator/system/software.py index 7f206311..70c1bbf2 100644 --- a/src/primaite/simulator/system/software.py +++ b/src/primaite/simulator/system/software.py @@ -1,8 +1,9 @@ from abc import abstractmethod from enum import Enum -from typing import Any, Dict +from typing import Any, Dict, Optional from primaite.simulator.core import Action, ActionManager, SimComponent +from primaite.simulator.file_system.file_system import FileSystem, Folder from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.core.sys_log import SysLog @@ -79,6 +80,10 @@ class Software(SimComponent): "An instance of Software Manager that is used by the parent node." sys_log: SysLog = None "An instance of SysLog that is used by the parent node." + file_system: FileSystem + "The FileSystem of the Node the Software is installed on." + folder: Optional[Folder] = None + "The folder on the file system the Software uses." def _init_action_manager(self) -> ActionManager: am = super()._init_action_manager() @@ -216,7 +221,6 @@ class IOSoftware(Software): :param kwargs: Additional keyword arguments specific to the implementation. :return: True if the payload was successfully sent, False otherwise. """ - pass def receive(self, payload: Any, session_id: str, **kwargs) -> bool: """ diff --git a/tests/conftest.py b/tests/conftest.py index f1c05187..9c216a8e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,17 @@ ACTION_SPACE_NODE_ACTION_VALUES = 1 _LOGGER = getLogger(__name__) +# PrimAITE v3 stuff +from primaite.simulator.file_system.file_system import FileSystem +from primaite.simulator.network.hardware.base import Node + +@pytest.fixture(scope="function") +def file_system() -> FileSystem: + return Node(hostname="fs_node").file_system + + +# PrimAITE v2 stuff class TempPrimaiteSession(PrimaiteSession): """ A temporary PrimaiteSession class. diff --git a/tests/integration_tests/system/test_database_on_node.py b/tests/integration_tests/system/test_database_on_node.py index 058bb590..7ad11222 100644 --- a/tests/integration_tests/system/test_database_on_node.py +++ b/tests/integration_tests/system/test_database_on_node.py @@ -1,52 +1,39 @@ -from primaite.simulator.network.hardware.base import Node -from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.services.database import DatabaseService -from primaite.simulator.system.services.service import ServiceOperatingState -from primaite.simulator.system.software import SoftwareCriticality, SoftwareHealthState +from ipaddress import IPv4Address + +from primaite.simulator.network.hardware.nodes.computer import Computer +from primaite.simulator.network.networks import arcd_uc2_network +from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame +from primaite.simulator.network.transmission.network_layer import IPPacket, Precedence +from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader -def test_installing_database(): - db = DatabaseService( - name="SQL-database", - health_state_actual=SoftwareHealthState.GOOD, - health_state_visible=SoftwareHealthState.GOOD, - criticality=SoftwareCriticality.MEDIUM, - port=Port.SQL_SERVER, - operating_state=ServiceOperatingState.RUNNING, +def test_database_query_across_the_network(): + """Tests DB query across the network returns HTTP status 200 and date.""" + network = arcd_uc2_network() + + client_1: Computer = network.get_node_by_hostname("client_1") + + client_1.arp.send_arp_request(IPv4Address("192.168.1.14")) + + dst_mac_address = client_1.arp.get_arp_cache_mac_address(IPv4Address("192.168.1.14")) + + outbound_nic = client_1.arp.get_arp_cache_nic(IPv4Address("192.168.1.14")) + client_1.ping("192.168.1.14") + + frame = Frame( + ethernet=EthernetHeader(src_mac_addr=client_1.ethernet_port[1].mac_address, dst_mac_addr=dst_mac_address), + ip=IPPacket( + src_ip_address=client_1.ethernet_port[1].ip_address, + dst_ip_address=IPv4Address("192.168.1.14"), + precedence=Precedence.FLASH, + ), + tcp=TCPHeader(src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER), + payload="SELECT * FROM user;", ) - node = Node(hostname="db-server") + outbound_nic.send_frame(frame) - node.install_service(db) + client_1_last_payload = outbound_nic.pcap.read()[-1]["payload"] - assert db in node - - file_exists = False - for folder in node.file_system.folders.values(): - for file in folder.files.values(): - if file.name == "db_primary_store": - file_exists = True - break - if file_exists: - break - assert file_exists - - -def test_uninstalling_database(): - db = DatabaseService( - name="SQL-database", - health_state_actual=SoftwareHealthState.GOOD, - health_state_visible=SoftwareHealthState.GOOD, - criticality=SoftwareCriticality.MEDIUM, - port=Port.SQL_SERVER, - operating_state=ServiceOperatingState.RUNNING, - ) - - node = Node(hostname="db-server") - - node.install_service(db) - - node.uninstall_service(db) - - assert db not in node - assert node.file_system.get_folder_by_name("database") is None + assert client_1_last_payload["status_code"] == 200 + assert client_1_last_payload["data"] diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py index 348eb440..d1d78003 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py @@ -1,132 +1,144 @@ +import pytest + from primaite.simulator.file_system.file_system import FileSystem -from primaite.simulator.file_system.file_system_file import FileSystemFile -from primaite.simulator.file_system.file_system_folder import FileSystemFolder +from primaite.simulator.file_system.file_type import FileType -def test_create_folder_and_file(): +def test_create_folder_and_file(file_system): """Test creating a folder and a file.""" - file_system = FileSystem() - folder = file_system.create_folder(folder_name="test_folder") - assert len(file_system.folders) is 1 + assert len(file_system.folders) == 1 + file_system.create_folder(folder_name="test_folder") - file = file_system.create_file(file_name="test_file", size=10, folder_uuid=folder.uuid) - assert len(file_system.get_folder_by_id(folder.uuid).files) is 1 + assert len(file_system.folders) is 2 + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - assert file_system.get_file_by_id(file.uuid).name is "test_file" - assert file_system.get_file_by_id(file.uuid).size == 10 + assert len(file_system.get_folder("test_folder").files) == 1 + + assert file_system.get_folder("test_folder").get_file("test_file.txt") -def test_create_file(): +def test_create_file_no_folder(file_system): """Tests that creating a file without a folder creates a folder and sets that as the file's parent.""" - file_system = FileSystem() - - file = file_system.create_file(file_name="test_file", size=10) + file = file_system.create_file(file_name="test_file.txt", size=10) assert len(file_system.folders) is 1 - assert file_system.get_folder_by_name("root").get_file_by_id(file.uuid) is file + assert file_system.get_folder("root").get_file("test_file.txt") == file + assert file_system.get_folder("root").get_file("test_file.txt").file_type == FileType.TXT + assert file_system.get_folder("root").get_file("test_file.txt").size == 10 -def test_delete_file(): +def test_create_file_no_extension(file_system): + """Tests that creating a file without an extension sets the file type to FileType.UNKNOWN.""" + file = file_system.create_file(file_name="test_file") + assert len(file_system.folders) is 1 + assert file_system.get_folder("root").get_file("test_file") == file + assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN + assert file_system.get_folder("root").get_file("test_file").size == 0 + + +def test_delete_file(file_system): """Tests that a file can be deleted.""" - file_system = FileSystem() + file_system.create_file(file_name="test_file.txt") + assert len(file_system.folders) == 1 + assert len(file_system.get_folder("root").files) == 1 - file = file_system.create_file(file_name="test_file", size=10) - assert len(file_system.folders) is 1 - - folder_id = list(file_system.folders.keys())[0] - folder = file_system.get_folder_by_id(folder_id) - assert folder.get_file_by_id(file.uuid) is file - - file_system.delete_file(file=file) - assert len(file_system.folders) is 1 - assert len(folder.files) is 0 + file_system.delete_file(folder_name="root", file_name="test_file.txt") + assert len(file_system.folders) == 1 + assert len(file_system.get_folder("root").files) == 0 -def test_delete_non_existent_file(): +def test_delete_non_existent_file(file_system): """Tests deleting a non existent file.""" - file_system = FileSystem() - - file = file_system.create_file(file_name="test_file", size=10) - not_added_file = FileSystemFile(name="not_added") + file_system.create_file(file_name="test_file.txt") # folder should be created - assert len(file_system.folders) is 1 + assert len(file_system.folders) == 1 # should only have 1 file in the file system - folder_id = list(file_system.folders.keys())[0] - folder = file_system.get_folder_by_id(folder_id) - assert len(list(folder.files)) is 1 - - assert folder.get_file_by_id(file.uuid) is file + assert len(file_system.get_folder("root").files) == 1 # deleting should not change how many files are in folder - file_system.delete_file(file=not_added_file) - assert len(file_system.folders) is 1 - assert len(list(folder.files)) is 1 + file_system.delete_file(folder_name="root", file_name="does_not_exist!") + + # should still only be one folder + assert len(file_system.folders) == 1 + # The folder should still have 1 file + assert len(file_system.get_folder("root").files) == 1 -def test_delete_folder(): - file_system = FileSystem() - folder = file_system.create_folder(folder_name="test_folder") - assert len(file_system.folders) is 1 +def test_delete_folder(file_system): + file_system.create_folder(folder_name="test_folder") + assert len(file_system.folders) == 2 - file_system.delete_folder(folder) - assert len(file_system.folders) is 0 + file_system.delete_folder(folder_name="test_folder") + assert len(file_system.folders) == 1 -def test_deleting_a_non_existent_folder(): - file_system = FileSystem() - folder = file_system.create_folder(folder_name="test_folder") - not_added_folder = FileSystemFolder(name="fake_folder") - assert len(file_system.folders) is 1 +def test_deleting_a_non_existent_folder(file_system): + file_system.create_folder(folder_name="test_folder") + assert len(file_system.folders) == 2 - file_system.delete_folder(not_added_folder) - assert len(file_system.folders) is 1 + file_system.delete_folder(folder_name="does not exist!") + assert len(file_system.folders) == 2 -def test_move_file(): +def test_deleting_root_folder_fails(file_system): + assert len(file_system.folders) == 1 + + file_system.delete_folder(folder_name="root") + assert len(file_system.folders) == 1 + + +def test_move_file(file_system): """Tests the file move function.""" - file_system = FileSystem() - src_folder = file_system.create_folder(folder_name="test_folder_1") - assert len(file_system.folders) is 1 + file_system.create_folder(folder_name="src_folder") + file_system.create_folder(folder_name="dst_folder") - target_folder = file_system.create_folder(folder_name="test_folder_2") - assert len(file_system.folders) is 2 + file = file_system.create_file(file_name="test_file.txt", size=10, folder_name="src_folder") + original_uuid = file.uuid - file = file_system.create_file(file_name="test_file", size=10, folder_uuid=src_folder.uuid) - assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1 - assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 0 + assert len(file_system.get_folder("src_folder").files) == 1 + assert len(file_system.get_folder("dst_folder").files) == 0 - file_system.move_file(file=file, src_folder=src_folder, target_folder=target_folder) + file_system.move_file(src_folder_name="src_folder", src_file_name="test_file.txt", dst_folder_name="dst_folder") - assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 0 - assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 1 + assert len(file_system.get_folder("src_folder").files) == 0 + assert len(file_system.get_folder("dst_folder").files) == 1 + assert file_system.get_file("dst_folder", "test_file.txt").uuid == original_uuid -def test_copy_file(): +def test_copy_file(file_system): """Tests the file copy function.""" - file_system = FileSystem() - src_folder = file_system.create_folder(folder_name="test_folder_1") - assert len(file_system.folders) is 1 + file_system.create_folder(folder_name="src_folder") + file_system.create_folder(folder_name="dst_folder") - target_folder = file_system.create_folder(folder_name="test_folder_2") - assert len(file_system.folders) is 2 + file = file_system.create_file(file_name="test_file.txt", size=10, folder_name="src_folder", real=True) + original_uuid = file.uuid - file = file_system.create_file(file_name="test_file", size=10, folder_uuid=src_folder.uuid) - assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1 - assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 0 + assert len(file_system.get_folder("src_folder").files) == 1 + assert len(file_system.get_folder("dst_folder").files) == 0 - file_system.copy_file(file=file, src_folder=src_folder, target_folder=target_folder) + file_system.copy_file(src_folder_name="src_folder", src_file_name="test_file.txt", dst_folder_name="dst_folder") - assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1 - assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 1 + assert len(file_system.get_folder("src_folder").files) == 1 + assert len(file_system.get_folder("dst_folder").files) == 1 + assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid -def test_serialisation(): +def test_folder_quarantine_state(file_system): + """Tests the changing of folder quarantine status.""" + folder = file_system.get_folder("root") + + assert folder.quarantine_status() is False + + folder.quarantine() + assert folder.quarantine_status() is True + + folder.unquarantine() + assert folder.quarantine_status() is False + + +@pytest.mark.skip(reason="Skipping until we tackle serialisation") +def test_serialisation(file_system): """Test to check that the object serialisation works correctly.""" - file_system = FileSystem() - folder = file_system.create_folder(folder_name="test_folder") - assert len(file_system.folders) is 1 - - file_system.create_file(file_name="test_file", size=10, folder_uuid=folder.uuid) - assert file_system.get_folder_by_id(folder.uuid) is folder + file_system.create_file(file_name="test_file.txt") serialised_file_sys = file_system.model_dump_json() deserialised_file_sys = FileSystem.model_validate_json(serialised_file_sys) diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_file.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_file.py deleted file mode 100644 index 629b9bb9..00000000 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_file.py +++ /dev/null @@ -1,23 +0,0 @@ -from primaite.simulator.file_system.file_system_file import FileSystemFile -from primaite.simulator.file_system.file_system_file_type import FileSystemFileType - - -def test_file_type(): - """Tests tha the FileSystemFile type is set correctly.""" - file = FileSystemFile(name="test", file_type=FileSystemFileType.DOC) - assert file.file_type is FileSystemFileType.DOC - - -def test_get_size(): - """Tests that the file size is being returned properly.""" - file = FileSystemFile(name="test", size=1.5) - assert file.size == 1.5 - - -def test_serialisation(): - """Test to check that the object serialisation works correctly.""" - file = FileSystemFile(name="test", size=1.5, file_type=FileSystemFileType.DOC) - serialised_file = file.model_dump_json() - deserialised_file = FileSystemFile.model_validate_json(serialised_file) - - assert file.model_dump_json() == deserialised_file.model_dump_json() diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_folder.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_folder.py deleted file mode 100644 index 1940e886..00000000 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_folder.py +++ /dev/null @@ -1,75 +0,0 @@ -from primaite.simulator.file_system.file_system_file import FileSystemFile -from primaite.simulator.file_system.file_system_file_type import FileSystemFileType -from primaite.simulator.file_system.file_system_folder import FileSystemFolder - - -def test_adding_removing_file(): - """Test the adding and removing of a file from a folder.""" - folder = FileSystemFolder(name="test") - - file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC) - - folder.add_file(file) - assert folder.size == 10 - assert len(folder.files) is 1 - - folder.remove_file(file) - assert folder.size == 0 - assert len(folder.files) is 0 - - -def test_remove_non_existent_file(): - """Test the removing of a file that does not exist.""" - folder = FileSystemFolder(name="test") - - file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC) - not_added_file = FileSystemFile(name="fake_file", size=10, file_type=FileSystemFileType.DOC) - - folder.add_file(file) - assert folder.size == 10 - assert len(folder.files) is 1 - - folder.remove_file(not_added_file) - assert folder.size == 10 - assert len(folder.files) is 1 - - -def test_get_file_by_id(): - """Test to make sure that the correct file is returned.""" - folder = FileSystemFolder(name="test") - - file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC) - file2 = FileSystemFile(name="test_file_2", size=10, file_type=FileSystemFileType.DOC) - - folder.add_file(file) - folder.add_file(file2) - assert folder.size == 20 - assert len(folder.files) is 2 - - assert folder.get_file_by_id(file_id=file.uuid) is file - - -def test_folder_quarantine_state(): - """Tests the changing of folder quarantine status.""" - folder = FileSystemFolder(name="test") - - assert folder.quarantine_status() is False - - folder.quarantine() - assert folder.quarantine_status() is True - - folder.end_quarantine() - assert folder.quarantine_status() is False - - -def test_serialisation(): - """Test to check that the object serialisation works correctly.""" - folder = FileSystemFolder(name="test") - file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC) - folder.add_file(file) - - serialised_folder = folder.model_dump_json() - - deserialised_folder = FileSystemFolder.model_validate_json(serialised_folder) - - assert folder.model_dump_json() == deserialised_folder.model_dump_json() diff --git a/tests/unit_tests/_primaite/_simulator/_system/_services/test_database.py b/tests/unit_tests/_primaite/_simulator/_system/_services/test_database.py index ebc5536f..f3751f27 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_services/test_database.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_services/test_database.py @@ -1,15 +1,59 @@ -from primaite.simulator.network.transmission.transport_layer import Port +import json + +import pytest + +from primaite.simulator.network.hardware.base import Node from primaite.simulator.system.services.database import DatabaseService -from primaite.simulator.system.services.service import ServiceOperatingState -from primaite.simulator.system.software import SoftwareCriticality, SoftwareHealthState + +DDL = """ +CREATE TABLE IF NOT EXISTS user ( +id INTEGER PRIMARY KEY AUTOINCREMENT, +name VARCHAR(50) NOT NULL, +email VARCHAR(50) NOT NULL, +age INT, +city VARCHAR(50), +occupation VARCHAR(50) +);""" + +USER_INSERT_STATEMENTS = [ + "INSERT INTO user (name, email, age, city, occupation) VALUES ('John Doe', 'johndoe@example.com', 32, 'New York', 'Engineer');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Jane Smith', 'janesmith@example.com', 27, 'Los Angeles', 'Designer');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Bob Johnson', 'bobjohnson@example.com', 45, 'Chicago', 'Manager');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Alice Lee', 'alicelee@example.com', 22, 'San Francisco', 'Student');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('David Kim', 'davidkim@example.com', 38, 'Houston', 'Consultant');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Emily Chen', 'emilychen@example.com', 29, 'Seattle', 'Software Developer');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Frank Wang', 'frankwang@example.com', 55, 'New York', 'Entrepreneur');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Grace Park', 'gracepark@example.com', 31, 'Los Angeles', 'Marketing Specialist');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Henry Wu', 'henrywu@example.com', 40, 'Chicago', 'Accountant');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Isabella Kim', 'isabellakim@example.com', 26, 'San Francisco', 'Graphic Designer');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Jake Lee', 'jakelee@example.com', 33, 'Houston', 'Sales Manager');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Kelly Chen', 'kellychen@example.com', 28, 'Seattle', 'Web Developer');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Lucas Liu', 'lucasliu@example.com', 42, 'New York', 'Lawyer');", + "INSERT INTO user (name, email, age, city, occupation) VALUES ('Maggie Wang', 'maggiewang@example.com', 30, 'Los Angeles', 'Data Analyst');", +] -def test_creation(): - db = DatabaseService( - name="SQL-database", - health_state_actual=SoftwareHealthState.GOOD, - health_state_visible=SoftwareHealthState.GOOD, - criticality=SoftwareCriticality.MEDIUM, - port=Port.SQL_SERVER, - operating_state=ServiceOperatingState.RUNNING, - ) +@pytest.fixture(scope="function") +def database_server() -> Node: + node = Node(hostname="db_node") + node.software_manager.add_service(DatabaseService) + node.software_manager.services["Database"].start() + return node + + +@pytest.fixture(scope="function") +def database(database_server) -> DatabaseService: + database: DatabaseService = database_server.software_manager.services["Database"] # noqa + database.receive(DDL, None) + for script in USER_INSERT_STATEMENTS: + database.receive(script, None) + return database + + +def test_creation(database_server): + database_server.software_manager.show() + + +def test_db_population(database): + database.show() + assert database.tables() == ["user"]