from __future__ import annotations import math import os.path import shutil from abc import abstractmethod from pathlib import Path from typing import Dict, Optional from prettytable import MARKDOWN, PrettyTable from primaite import getLogger from primaite.simulator.core import SimComponent from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension from primaite.simulator.system.core.sys_log import SysLog _LOGGER = getLogger(__name__) def convert_size(size_bytes): """ Convert a file size from bytes to a string with a more human-readable format. This function takes the size of a file in bytes and converts it to a string representation with appropriate size units (B, KB, MB, GB, etc.). :param size_bytes: The size of the file in bytes. :return: The human-readable string representation of the file size. """ if size_bytes == 0: return "0 B" # Tuple of size units starting from Bytes up to Yottabytes size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") # Calculate the index (i) that will be used to select the appropriate size unit from size_name i = int(math.floor(math.log(size_bytes, 1024))) # Calculate the adjusted size value (s) in terms of the new size unit p = math.pow(1024, i) s = round(size_bytes / p, 2) return f"{s} {size_name[i]}" class FileSystemItemABC(SimComponent): """Abstract base class for file system items used in the file system simulation.""" name: str "The name of the FileSystemItemABC." def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. :return: Current state of this object and child objects. """ state = super().describe_state() state.update( { "name": self.name, } ) return state @property def size_str(self): return convert_size(self.size) class FileSystem(SimComponent): """Class that contains all the simulation File System.""" folders: Dict[str, Folder] = {} "List containing all the folders in the file system." _folders_by_name: Dict[str, Folder] = {} sys_log: SysLog sim_root: Path def __init__(self, **kwargs): super().__init__(**kwargs) # Ensure a default root folder if not self.folders: self.create_folder("root") @property def size(self): return sum(folder.size for folder in self.folders.values()) def show(self, markdown: bool = False, full: bool = False): """Prints a of the FileSystem""" headers = ["Folder", "Size"] if full: headers[0] = "File Path" table = PrettyTable(headers) if markdown: table.set_style(MARKDOWN) table.align = "l" table.title = f"{self.sys_log.hostname} File System" for folder in self.folders.values(): if not full: table.add_row([folder.name, folder.size_str]) else: for file in folder.files.values(): table.add_row([file.path, file.size_str]) if full: print(table.get_string(sortby="File Path")) else: print(table.get_string(sortby="Folder")) def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. :return: Current state of this object and child objects. """ state = super().describe_state() state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()} return state def create_folder(self, folder_name: str) -> Folder: """ Creates a Folder and adds it to the list of folders. :param folder_name: The name of the folder. """ # check if folder with name already exists if self.get_folder(folder_name): raise Exception(f"Cannot create folder as it already exists: {folder_name}") folder = Folder(name=folder_name, fs=self) self.folders[folder.uuid] = folder self._folders_by_name[folder.name] = folder self.sys_log.info(f"Created folder /{folder.name}") return folder def delete_folder(self, folder_name: str): """ Deletes a folder, removes it from the folders list and removes any child folders and files. :param folder_name: The name of the folder. """ if folder_name == "root": self.sys_log.warning("Cannot delete the root folder.") return folder = self._folders_by_name.get(folder_name) if folder: for file in folder.files.values(): self.delete_file(file) self.folders.pop(folder.uuid) self._folders_by_name.pop(folder.name) self.sys_log.info(f"Deleted folder /{folder.name} and its contents") else: _LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}") def create_file( self, file_name: str, size: Optional[int] = None, file_type: Optional[FileType] = None, folder_name: Optional[str] = None, real: bool = False, ) -> File: """ Creates a File and adds it to the list of files. :param file_name: The file name. :param size: The size the file takes on disk in bytes. :param file_type: The type of the file. :param folder_name: The folder to add the file to. :param real: "Indicates whether the File is actually a real file in the Node sim fs output." """ if folder_name: # check if file with name already exists folder = self._folders_by_name.get(folder_name) # If not then create it if not folder: folder = self.create_folder(folder_name) else: # Use root folder if folder_name not supplied folder = self._folders_by_name["root"] # Create the file and add it to the folder file = File( name=file_name, sim_size=size, file_type=file_type, folder=folder, real=real, sim_path=self.sim_root if real else None, ) folder.add_file(file) self.sys_log.info(f"Created file /{file.path}") return file def get_file(self, folder_name: str, file_name: str) -> Optional[File]: folder = self.get_folder(folder_name) if folder: return folder.get_file(file_name) self.fs.sys_log.info(f"file not found /{folder_name}/{file_name}") def delete_file(self, folder_name: str, file_name: str): folder = self.get_folder(folder_name) if folder: file = folder.get_file(file_name) if file: folder.remove_file(file) self.sys_log.info(f"Deleted file /{file.path}") def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name): file = self.get_file(folder_name=src_folder_name, file_name=src_file_name) if file: src_folder = file.folder # remove file from src src_folder.remove_file(file) dst_folder = self.get_folder(folder_name=dst_folder_name) if not dst_folder: dst_folder = self.create_folder(dst_folder_name) # add file to dst dst_folder.add_file(file) if file.real: old_sim_path = file.sim_path file.sim_path = file.folder.fs.sim_root / file.path file.sim_path.parent.mkdir(exist_ok=True) shutil.move(old_sim_path, file.sim_path) def copy_file(self, src_folder_name: str, src_file_name: str, dst_folder_name): file = self.get_file(folder_name=src_folder_name, file_name=src_file_name) if file: dst_folder = self.get_folder(folder_name=dst_folder_name) if not dst_folder: dst_folder = self.create_folder(dst_folder_name) new_file = file.make_copy(dst_folder=dst_folder) dst_folder.add_file(new_file) if file.real: new_file.sim_path.parent.mkdir(exist_ok=True) shutil.copy2(file.sim_path, new_file.sim_path) def get_folder(self, folder_name: str) -> Optional[Folder]: """ Get a folder by its name if it exists. :param folder_name: The folder name. :return: The matching Folder. """ return self._folders_by_name.get(folder_name) def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]: """ Get a folder by its uuid if it exists. :param folder_uuid: The folder uuid. :return: The matching Folder. """ return self.folders.get(folder_uuid) class Folder(FileSystemItemABC): """Simulation Folder.""" fs: FileSystem "The FileSystem the Folder is in." files: Dict[str, File] = {} "Files stored in the folder." _files_by_name: Dict[str, File] = {} "Files by their name as .." is_quarantined: bool = False "Flag that marks the folder as quarantined if true." def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. :return: Current state of this object and child objects. """ state = super().describe_state() state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()} state["is_quarantined"] = self.is_quarantined return state def show(self, markdown: bool = False): """Prints a of the Folder""" table = PrettyTable(["File", "Size"]) if markdown: table.set_style(MARKDOWN) table.align = "l" table.title = f"{self.fs.sys_log.hostname} File System Folder ({self.name})" for file in self.files.values(): table.add_row([file.name, file.size_str]) print(table.get_string(sortby="File")) @property def size(self): return sum(file.size for file in self.files.values() if file.size is not None) def get_file(self, file_name: str) -> Optional[File]: """ Get a file by its name. File name must be the filename and prefix, like 'memo.docx'. :param file_name: The file name. :return: The matching File. """ # TODO: Increment read count? return self._files_by_name.get(file_name) def get_file_by_id(self, file_uuid: str) -> File: """ Get a file by its uuid. :param file_uuid: The file uuid. :return: The matching File. """ return self.files.get(file_uuid) def add_file(self, file: File): """Adds a file to the folder list.""" if file is None or not isinstance(file, File): raise Exception(f"Invalid file: {file}") # check if file with id already exists in folder if file.uuid in self.files: _LOGGER.debug(f"File with id {file.uuid} already exists in folder") else: # add to list self.files[file.uuid] = file self._files_by_name[file.name] = file file.folder = self def remove_file(self, file: Optional[File]): """ Removes a file from the folder list. The method can take a File object or a file id. :param file: The file to remove :type: Optional[File] """ if file is None or not isinstance(file, File): raise Exception(f"Invalid file: {file}") if self.files.get(file.uuid): self.files.pop(file.uuid) self._files_by_name.pop(file.name) else: _LOGGER.debug(f"File with UUID {file.uuid} was not found.") def quarantine(self): """Quarantines the File System Folder.""" if not self.is_quarantined: self.is_quarantined = True self.fs.sys_log.info(f"Quarantined folder ./{self.name}") def unquarantine(self): """Unquarantine of the File System Folder.""" if self.is_quarantined: self.is_quarantined = False self.fs.sys_log.info(f"Quarantined folder ./{self.name}") def quarantine_status(self) -> bool: """Returns true if the folder is being quarantined.""" return self.is_quarantined class File(FileSystemItemABC): """Class that represents a file in the simulation.""" folder: Folder "The Folder the File is in." file_type: FileType "The type of File." sim_size: Optional[int] = None "The simulated file size." real: bool = False "Indicates whether the File is actually a real file in the Node sim fs output." sim_path: Optional[Path] = None "The Path if real is True." def __init__(self, **kwargs): """ Initialise File class. :param name: The name of the file. :param file_type: The FileType of the file :param size: The size of the FileSystemItemABC """ has_extension = "." in kwargs["name"] # Attempt to use the file type extension to set/override the FileType if has_extension: extension = kwargs["name"].split(".")[-1] kwargs["file_type"] = get_file_type_from_extension(extension) else: # If the file name does not have a extension, override file type to FileType.UNKNOWN if not kwargs["file_type"]: kwargs["file_type"] = FileType.UNKNOWN if kwargs["file_type"] != FileType.UNKNOWN: kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}" # set random file size if none provided if not kwargs.get("sim_size"): kwargs["sim_size"] = kwargs["file_type"].default_size super().__init__(**kwargs) if self.real: self.sim_path = self.folder.fs.sim_root / self.path if not self.sim_path.exists(): self.sim_path.parent.mkdir(exist_ok=True, parents=True) with open(self.sim_path, mode="a"): pass def make_copy(self, dst_folder: Folder) -> File: return File(folder=dst_folder, **self.model_dump(exclude={"uuid", "folder", "sim_path"})) @property def path(self): """The path of the file in the FileSystem.""" return f"{self.folder.name}/{self.name}" @property def size(self) -> int: """The file size in Bytes.""" if self.real: return os.path.getsize(self.sim_path) return self.sim_size def describe_state(self) -> Dict: """Produce a dictionary describing the current state of this object.""" state = super().describe_state() state["size"] = self.size state["file_type"] = self.file_type.name return state