diff --git a/src/primaite/simulator/file_system/file.py b/src/primaite/simulator/file_system/file.py new file mode 100644 index 00000000..0da3c9ab --- /dev/null +++ b/src/primaite/simulator/file_system/file.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import hashlib +import json +import os.path +from pathlib import Path +from typing import Dict, Optional + +from primaite import getLogger +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus +from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension + +_LOGGER = getLogger(__name__) + + +class File(FileSystemItemABC): + """ + Class representing a file in the simulation. + + :ivar Folder folder: The folder in which the file resides. + :ivar FileType file_type: The type of the file. + :ivar Optional[int] sim_size: The simulated file size. + :ivar bool real: Indicates if the file is actually a real file in the Node sim fs output. + :ivar Optional[Path] sim_path: The path if the file is real. + """ + + folder_id: str + "The id of the Folder the File is in." + folder_name: str + "The name of the Folder the file is in." + file_type: FileType + "The type of File." + sim_size: Optional[int] = None + "The simulated file size." + real: bool = False + "Indicates whether the File is actually a real file in the Node sim fs output." + sim_path: Optional[Path] = None + "The Path if real is True." + sim_root: Optional[Path] = None + "Root path of the simulation." + + def __init__(self, **kwargs): + """ + Initialise File class. + + :param name: The name of the file. + :param file_type: The FileType of the file + :param size: The size of the FileSystemItemABC + """ + has_extension = "." in kwargs["name"] + + # Attempt to use the file type extension to set/override the FileType + if has_extension: + extension = kwargs["name"].split(".")[-1] + kwargs["file_type"] = get_file_type_from_extension(extension) + else: + # If the file name does not have a extension, override file type to FileType.UNKNOWN + if not kwargs["file_type"]: + kwargs["file_type"] = FileType.UNKNOWN + if kwargs["file_type"] != FileType.UNKNOWN: + kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}" + + # set random file size if none provided + if not kwargs.get("sim_size"): + kwargs["sim_size"] = kwargs["file_type"].default_size + super().__init__(**kwargs) + if self.real: + self.sim_path = self.sim_root / self.path + if not self.sim_path.exists(): + self.sim_path.parent.mkdir(exist_ok=True, parents=True) + with open(self.sim_path, mode="a"): + pass + + self.sys_log.info(f"Created file /{self.path} (id: {self.uuid})") + + @property + def path(self) -> str: + """ + Get the path of the file in the file system. + + :return: The full path of the file. + """ + return f"{self.folder_name}/{self.name}" + + @property + def size(self) -> int: + """ + Get the size of the file in bytes. + + :return: The size of the file in bytes. + """ + if self.real: + return os.path.getsize(self.sim_path) + return self.sim_size + + def describe_state(self) -> Dict: + """Produce a dictionary describing the current state of this object.""" + state = super().describe_state() + state["size"] = self.size + state["file_type"] = self.file_type.name + return state + + def scan(self) -> None: + """Updates the visible statuses of the file.""" + path = self.folder.name + "/" + self.name + self.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}") + self.visible_health_status = self.health_status + + def reveal_to_red(self): + """Reveals the folder/file to the red agent.""" + self.revealed_to_red = True + + def check_hash(self) -> bool: + """ + Check if the file has been changed. + + If changed, the file is considered corrupted. + + Return False if corruption is detected, otherwise True + """ + current_hash = None + + # if file is real, read the file contents + if self.real: + with open(self.sim_path, "rb") as f: + file_hash = hashlib.blake2b() + while chunk := f.read(8192): + file_hash.update(chunk) + + current_hash = file_hash.hexdigest() + else: + # otherwise get describe_state dict and hash that + current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest() + + # if the previous hash is None, set the current hash to previous + if self.previous_hash is None: + self.previous_hash = current_hash + + # if the previous hash and current hash do not match, mark file as corrupted + if self.previous_hash is not current_hash: + self.corrupt() + return False + + return True + + def repair(self) -> bool: + """Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" + super().repair() + + # set file status to good if corrupt + if self.health_status == FileSystemItemHealthStatus.CORRUPT: + self.health_status = FileSystemItemHealthStatus.GOOD + + path = self.folder.name + "/" + self.name + self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") + return True + + def restore(self) -> bool: + """Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" + super().restore() + + restored = False + + if self.health_status == FileSystemItemHealthStatus.CORRUPT: + self.health_status = FileSystemItemHealthStatus.GOOD + restored = True + + path = self.folder.name + "/" + self.name + self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}") + return restored + + def corrupt(self) -> bool: + """Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT.""" + super().corrupt() + + corrupted = False + + # set file status to good if corrupt + if self.health_status == FileSystemItemHealthStatus.GOOD: + self.health_status = FileSystemItemHealthStatus.CORRUPT + corrupted = True + + path = self.folder.name + "/" + self.name + self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") + return corrupted diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index 02f3c5e6..16c9992c 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -1,8 +1,5 @@ from __future__ import annotations -import hashlib -import json -import os.path import shutil from pathlib import Path from typing import Dict, Optional @@ -11,8 +8,9 @@ from prettytable import MARKDOWN, PrettyTable from primaite import getLogger from primaite.simulator.core import RequestManager, RequestType, SimComponent -from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus -from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension +from primaite.simulator.file_system.file import File +from primaite.simulator.file_system.file_type import FileType +from primaite.simulator.file_system.folder import Folder from primaite.simulator.system.core.sys_log import SysLog _LOGGER = getLogger(__name__) @@ -27,7 +25,9 @@ class FileSystem(SimComponent): "List containing all the folders that have been deleted." _folders_by_name: Dict[str, Folder] = {} sys_log: SysLog + "Instance of SysLog used to create system logs." sim_root: Path + "Root path of the simulation." def __init__(self, **kwargs): super().__init__(**kwargs) @@ -86,42 +86,9 @@ class FileSystem(SimComponent): else: print(table.get_string(sortby="Folder")) - def describe_state(self) -> Dict: - """ - Produce a dictionary describing the current state of this object. - - :return: Current state of this object and child objects. - """ - state = super().describe_state() - state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()} - return state - - def apply_timestep(self, timestep: int) -> None: - """Apply time step to FileSystem and its child folders and files.""" - super().apply_timestep(timestep=timestep) - - # apply timestep to folders - for folder_id in self.folders: - self.folders[folder_id].apply_timestep(timestep=timestep) - - def scan(self, instant_scan: bool = False): - """ - Scan all the folders (and child files) in the file system. - - :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. - """ - for folder_id in self.folders: - self.folders[folder_id].scan(instant_scan=instant_scan) - - def reveal_to_red(self, instant_scan: bool = False): - """ - Reveals all the folders (and child files) in the file system to the red agent. - - :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. - """ - for folder_id in self.folders: - self.folders[folder_id].reveal_to_red(instant_scan=instant_scan) - + ############################################################### + # Folder methods + ############################################################### def create_folder(self, folder_name: str) -> Folder: """ Creates a Folder and adds it to the list of folders. @@ -132,11 +99,10 @@ class FileSystem(SimComponent): if self.get_folder(folder_name): raise Exception(f"Cannot create folder as it already exists: {folder_name}") - folder = Folder(name=folder_name, fs=self) + folder = Folder(name=folder_name, sys_log=self.sys_log) self.folders[folder.uuid] = folder self._folders_by_name[folder.name] = folder - self.sys_log.info(f"Created folder /{folder.name}") self._folder_request_manager.add_request( name=folder.uuid, request_type=RequestType(func=folder._request_manager) ) @@ -174,9 +140,27 @@ class FileSystem(SimComponent): folder = self.get_folder_by_id(folder_uuid=folder_uuid) self.delete_folder(folder_name=folder.name) - def restore_folder(self, folder_id: str): - """TODO.""" - pass + def get_folder(self, folder_name: str) -> Optional[Folder]: + """ + Get a folder by its name if it exists. + + :param folder_name: The folder name. + :return: The matching Folder. + """ + return self._folders_by_name.get(folder_name) + + def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]: + """ + Get a folder by its uuid if it exists. + + :param: folder_uuid: The folder uuid. + :return: The matching Folder. + """ + return self.folders.get(folder_uuid) + + ############################################################### + # File methods + ############################################################### def create_file( self, @@ -210,12 +194,14 @@ class FileSystem(SimComponent): name=file_name, sim_size=size, file_type=file_type, - folder=folder, + folder_id=folder.uuid, + folder_name=folder.name, real=real, sim_path=self.sim_root if real else None, + sim_root=self.sim_root, + sys_log=self.sys_log, ) folder.add_file(file) - self.sys_log.info(f"Created file /{file.path}") self._file_request_manager.add_request(name=file.uuid, request_type=RequestType(func=file._request_manager)) return file @@ -266,7 +252,7 @@ class FileSystem(SimComponent): dst_folder.add_file(file) if file.real: old_sim_path = file.sim_path - file.sim_path = file.folder.fs.sim_root / file.path + file.sim_path = file.sim_root / file.path file.sim_path.parent.mkdir(exist_ok=True) shutil.move(old_sim_path, file.sim_path) @@ -280,14 +266,64 @@ class FileSystem(SimComponent): """ file = self.get_file(folder_name=src_folder_name, file_name=src_file_name) if file: + # check that dest folder exists dst_folder = self.get_folder(folder_name=dst_folder_name) if not dst_folder: + # create dest folder dst_folder = self.create_folder(dst_folder_name) - new_file = file.make_copy(dst_folder=dst_folder) - dst_folder.add_file(new_file) + + file_copy = File( + folder_id=dst_folder.uuid, + folder_name=dst_folder.name, + **file.model_dump(exclude={"uuid", "folder_id", "folder_name", "sim_path"}), + ) + dst_folder.add_file(file_copy) + if file.real: - new_file.sim_path.parent.mkdir(exist_ok=True) - shutil.copy2(file.sim_path, new_file.sim_path) + file_copy.sim_path.parent.mkdir(exist_ok=True) + shutil.copy2(file.sim_path, file_copy.sim_path) + else: + self.sys_log.error(f"Unable to copy file. {src_file_name} does not exist.") + + def describe_state(self) -> Dict: + """ + Produce a dictionary describing the current state of this object. + + :return: Current state of this object and child objects. + """ + state = super().describe_state() + state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()} + return state + + def apply_timestep(self, timestep: int) -> None: + """Apply time step to FileSystem and its child folders and files.""" + super().apply_timestep(timestep=timestep) + + # apply timestep to folders + for folder_id in self.folders: + self.folders[folder_id].apply_timestep(timestep=timestep) + + def scan(self, instant_scan: bool = False): + """ + Scan all the folders (and child files) in the file system. + + :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. + """ + for folder_id in self.folders: + self.folders[folder_id].scan(instant_scan=instant_scan) + + def reveal_to_red(self, instant_scan: bool = False): + """ + Reveals all the folders (and child files) in the file system to the red agent. + + :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. + """ + for folder_id in self.folders: + self.folders[folder_id].reveal_to_red(instant_scan=instant_scan) + + def restore_folder(self, folder_id: str): + """TODO.""" + pass def restore_file(self, folder_id: str, file_id: str): """ @@ -302,522 +338,3 @@ class FileSystem(SimComponent): :type: folder_id: str """ pass - - def get_folder(self, folder_name: str) -> Optional[Folder]: - """ - Get a folder by its name if it exists. - - :param folder_name: The folder name. - :return: The matching Folder. - """ - return self._folders_by_name.get(folder_name) - - def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]: - """ - Get a folder by its uuid if it exists. - - :param: folder_uuid: The folder uuid. - :return: The matching Folder. - """ - return self.folders.get(folder_uuid) - - -class Folder(FileSystemItemABC): - """Simulation Folder.""" - - fs: FileSystem - "The FileSystem the Folder is in." - files: Dict[str, File] = {} - "Files stored in the folder." - _files_by_name: Dict[str, File] = {} - "Files by their name as .." - deleted_files: Dict[str, File] = {} - "Files that have been deleted." - - scan_duration: int = -1 - "How many timesteps to complete a scan." - - red_scan_duration: int = -1 - "How many timesteps to complete reveal to red scan." - - def _init_request_manager(self) -> RequestManager: - rm = super()._init_request_manager() - rm.add_request( - name="delete", - request_type=RequestType(func=lambda request, context: self.remove_file_by_id(file_uuid=request[0])), - ) - return rm - - def describe_state(self) -> Dict: - """ - Produce a dictionary describing the current state of this object. - - :return: Current state of this object and child objects. - """ - state = super().describe_state() - state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()} - return state - - def show(self, markdown: bool = False): - """ - Display the contents of the Folder in tabular format. - - :param markdown: Whether to display the table in Markdown format or not. Default is `False`. - """ - table = PrettyTable(["File", "Size"]) - if markdown: - table.set_style(MARKDOWN) - table.align = "l" - table.title = f"{self.fs.sys_log.hostname} File System Folder ({self.name})" - for file in self.files.values(): - table.add_row([file.name, file.size_str]) - print(table.get_string(sortby="File")) - - @property - def size(self) -> int: - """ - Calculate and return the total size of all files in the folder. - - :return: The total size of all files in the folder. If no files exist or all have `None` - size, returns 0. - """ - return sum(file.size for file in self.files.values() if file.size is not None) - - def apply_timestep(self, timestep: int): - """ - Apply a single timestep of simulation dynamics to this folder and its files. - - In this instance, if any multi-timestep processes are currently occurring (such as scanning), - then they are brought one step closer to being finished. - - :param timestep: The current timestep number. (Amount of time since simulation episode began) - :type timestep: int - """ - super().apply_timestep(timestep=timestep) - - # scan files each timestep - if self.scan_duration >= 0: - self.scan_duration -= 1 - - if self.scan_duration == 0: - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - file.scan() - if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT: - self.visible_health_status = FileSystemItemHealthStatus.CORRUPT - - # red scan file at each step - if self.red_scan_duration >= 0: - self.red_scan_duration -= 1 - - if self.red_scan_duration == 0: - self.revealed_to_red = True - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - file.reveal_to_red() - - # apply timestep to files in folder - for file_id in self.files: - self.files[file_id].apply_timestep(timestep=timestep) - - def get_file(self, file_name: str) -> Optional[File]: - """ - Get a file by its name. - - File name must be the filename and prefix, like 'memo.docx'. - - :param file_name: The file name. - :return: The matching File. - """ - # TODO: Increment read count? - return self._files_by_name.get(file_name) - - def get_file_by_id(self, file_uuid: str) -> File: - """ - Get a file by its uuid. - - :param: file_uuid: The file uuid. - :return: The matching File. - """ - return self.files.get(file_uuid) - - def add_file(self, file: File): - """ - Adds a file to the folder. - - :param File file: The File object to be added to the folder. - :raises Exception: If the provided `file` parameter is None or not an instance of the - `File` class. - """ - if file is None or not isinstance(file, File): - raise Exception(f"Invalid file: {file}") - - # check if file with id already exists in folder - if file.uuid in self.files: - _LOGGER.debug(f"File with id {file.uuid} already exists in folder") - else: - # add to list - self.files[file.uuid] = file - self._files_by_name[file.name] = file - file.folder = self - - def remove_file(self, file: Optional[File]): - """ - Removes a file from the folder list. - - The method can take a File object or a file id. - - :param file: The file to remove - """ - if file is None or not isinstance(file, File): - raise Exception(f"Invalid file: {file}") - - if self.files.get(file.uuid): - self.files.pop(file.uuid) - self._files_by_name.pop(file.name) - self.deleted_files[file.uuid] = file - self.fs.sys_log.info(f"Removed file {file.name} (id: {file.uuid})") - else: - _LOGGER.debug(f"File with UUID {file.uuid} was not found.") - - def remove_file_by_id(self, file_uuid: str): - """ - Remove a file using id. - - :param: file_uuid: The UUID of the file to remove. - """ - file = self.get_file_by_id(file_uuid=file_uuid) - self.remove_file(file=file) - - def remove_all_files(self): - """Removes all the files in the folder.""" - for file_id in self.files: - self.deleted_files[file_id] = self.files[file_id] - - self.files = {} - self._files_by_name = {} - - def restore_file(self, file: Optional[File]): - """ - Restores a file. - - The method can take a File object or a file id. - - :param file: The file to restore - """ - pass - - def quarantine(self): - """Quarantines the File System Folder.""" - pass - - def unquarantine(self): - """Unquarantine of the File System Folder.""" - pass - - def quarantine_status(self) -> bool: - """Returns true if the folder is being quarantined.""" - pass - - def scan(self, instant_scan: bool = False) -> None: - """ - Update Folder visible status. - - :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. - """ - if instant_scan: - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - file.scan() - if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT: - self.visible_health_status = FileSystemItemHealthStatus.CORRUPT - return - - if self.scan_duration <= 0: - # scan one file per timestep - self.scan_duration = len(self.files) - self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})") - else: - # scan already in progress - self.fs.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})") - - def reveal_to_red(self, instant_scan: bool = False): - """ - Reveals the folders and files to the red agent. - - :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. - """ - if instant_scan: - self.revealed_to_red = True - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - file.reveal_to_red() - return - - if self.red_scan_duration <= 0: - # scan one file per timestep - self.red_scan_duration = len(self.files) - self.fs.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})") - else: - # scan already in progress - self.fs.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})") - - def check_hash(self) -> bool: - """ - Runs a :func:`check_hash` on all files in the folder. - - If a file under the folder is corrupted, the whole folder is considered corrupted. - - TODO: For now this will just iterate through the files and run :func:`check_hash` and ignores - any other changes to the folder - - Return False if corruption is detected, otherwise True - """ - super().check_hash() - - # iterate through the files and run a check hash - no_corrupted_files = True - - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - no_corrupted_files = file.check_hash() - - # if one file in the folder is corrupted, set the folder status to corrupted - if not no_corrupted_files: - self.corrupt() - - self.fs.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})") - - return no_corrupted_files - - def repair(self) -> bool: - """Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" - super().repair() - - repaired = False - - # iterate through the files in the folder - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - repaired = file.repair() - - # set file status to good if corrupt - if self.health_status == FileSystemItemHealthStatus.CORRUPT: - self.health_status = FileSystemItemHealthStatus.GOOD - repaired = True - - self.fs.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})") - return repaired - - def restore(self) -> bool: - """Restore a File by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" - super().restore() - - restored = False - - # iterate through the files in the folder - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - restored = file.restore() - - # set file status to corrupt if good - if self.health_status == FileSystemItemHealthStatus.CORRUPT: - self.health_status = FileSystemItemHealthStatus.GOOD - restored = True - - self.fs.sys_log.info(f"Restored folder {self.name} (id: {self.uuid})") - return restored - - def corrupt(self) -> bool: - """Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT.""" - super().corrupt() - - corrupted = False - - # iterate through the files in the folder - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - corrupted = file.corrupt() - - # set file status to corrupt if good - if self.health_status == FileSystemItemHealthStatus.GOOD: - self.health_status = FileSystemItemHealthStatus.CORRUPT - corrupted = True - - self.fs.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})") - return corrupted - - -class File(FileSystemItemABC): - """ - Class representing a file in the simulation. - - :ivar Folder folder: The folder in which the file resides. - :ivar FileType file_type: The type of the file. - :ivar Optional[int] sim_size: The simulated file size. - :ivar bool real: Indicates if the file is actually a real file in the Node sim fs output. - :ivar Optional[Path] sim_path: The path if the file is real. - """ - - folder: Folder - "The Folder the File is in." - file_type: FileType - "The type of File." - sim_size: Optional[int] = None - "The simulated file size." - real: bool = False - "Indicates whether the File is actually a real file in the Node sim fs output." - sim_path: Optional[Path] = None - "The Path if real is True." - - def __init__(self, **kwargs): - """ - Initialise File class. - - :param name: The name of the file. - :param file_type: The FileType of the file - :param size: The size of the FileSystemItemABC - """ - has_extension = "." in kwargs["name"] - - # Attempt to use the file type extension to set/override the FileType - if has_extension: - extension = kwargs["name"].split(".")[-1] - kwargs["file_type"] = get_file_type_from_extension(extension) - else: - # If the file name does not have a extension, override file type to FileType.UNKNOWN - if not kwargs["file_type"]: - kwargs["file_type"] = FileType.UNKNOWN - if kwargs["file_type"] != FileType.UNKNOWN: - kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}" - - # set random file size if none provided - if not kwargs.get("sim_size"): - kwargs["sim_size"] = kwargs["file_type"].default_size - super().__init__(**kwargs) - if self.real: - self.sim_path = self.folder.fs.sim_root / self.path - if not self.sim_path.exists(): - self.sim_path.parent.mkdir(exist_ok=True, parents=True) - with open(self.sim_path, mode="a"): - pass - - def make_copy(self, dst_folder: Folder) -> File: - """ - Create a copy of the current File object in the given destination folder. - - :param Folder dst_folder: The destination folder for the copied file. - :return: A new File object that is a copy of the current file. - """ - return File(folder=dst_folder, **self.model_dump(exclude={"uuid", "folder", "sim_path"})) - - @property - def path(self) -> str: - """ - Get the path of the file in the file system. - - :return: The full path of the file. - """ - return f"{self.folder.name}/{self.name}" - - @property - def size(self) -> int: - """ - Get the size of the file in bytes. - - :return: The size of the file in bytes. - """ - if self.real: - return os.path.getsize(self.sim_path) - return self.sim_size - - def describe_state(self) -> Dict: - """Produce a dictionary describing the current state of this object.""" - state = super().describe_state() - state["size"] = self.size - state["file_type"] = self.file_type.name - return state - - def scan(self) -> None: - """Updates the visible statuses of the file.""" - path = self.folder.name + "/" + self.name - self.folder.fs.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}") - self.visible_health_status = self.health_status - - def reveal_to_red(self): - """Reveals the folder/file to the red agent.""" - self.revealed_to_red = True - - def check_hash(self) -> bool: - """ - Check if the file has been changed. - - If changed, the file is considered corrupted. - - Return False if corruption is detected, otherwise True - """ - current_hash = None - - # if file is real, read the file contents - if self.real: - with open(self.sim_path, "rb") as f: - file_hash = hashlib.blake2b() - while chunk := f.read(8192): - file_hash.update(chunk) - - current_hash = file_hash.hexdigest() - else: - # otherwise get describe_state dict and hash that - current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest() - - # if the previous hash is None, set the current hash to previous - if self.previous_hash is None: - self.previous_hash = current_hash - - # if the previous hash and current hash do not match, mark file as corrupted - if self.previous_hash is not current_hash: - self.corrupt() - return False - - return True - - def repair(self) -> bool: - """Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" - super().repair() - - # set file status to good if corrupt - if self.health_status == FileSystemItemHealthStatus.CORRUPT: - self.health_status = FileSystemItemHealthStatus.GOOD - - path = self.folder.name + "/" + self.name - self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") - return True - - def restore(self) -> bool: - """Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" - super().restore() - - restored = False - - if self.health_status == FileSystemItemHealthStatus.CORRUPT: - self.health_status = FileSystemItemHealthStatus.GOOD - restored = True - - path = self.folder.name + "/" + self.name - self.folder.fs.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}") - return restored - - def corrupt(self) -> bool: - """Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT.""" - super().corrupt() - - corrupted = False - - # set file status to good if corrupt - if self.health_status == FileSystemItemHealthStatus.GOOD: - self.health_status = FileSystemItemHealthStatus.CORRUPT - corrupted = True - - path = self.folder.name + "/" + self.name - self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") - return corrupted diff --git a/src/primaite/simulator/file_system/file_system_item_abc.py b/src/primaite/simulator/file_system/file_system_item_abc.py index 8f61c718..c0d65139 100644 --- a/src/primaite/simulator/file_system/file_system_item_abc.py +++ b/src/primaite/simulator/file_system/file_system_item_abc.py @@ -7,6 +7,7 @@ from typing import Dict, Optional from primaite import getLogger from primaite.simulator.core import RequestManager, RequestType, SimComponent +from primaite.simulator.system.core.sys_log import SysLog _LOGGER = getLogger(__name__) @@ -78,6 +79,9 @@ class FileSystemItemABC(SimComponent): revealed_to_red: bool = False "If true, the folder/file has been revealed to the red agent." + sys_log: SysLog + "Used for creating system logs." + def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. diff --git a/src/primaite/simulator/file_system/folder.py b/src/primaite/simulator/file_system/folder.py new file mode 100644 index 00000000..6922cad1 --- /dev/null +++ b/src/primaite/simulator/file_system/folder.py @@ -0,0 +1,347 @@ +from __future__ import annotations + +from typing import Dict, Optional + +from prettytable import MARKDOWN, PrettyTable + +from primaite import getLogger +from primaite.simulator.core import RequestManager, RequestType +from primaite.simulator.file_system.file import File +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus + +_LOGGER = getLogger(__name__) + + +class Folder(FileSystemItemABC): + """Simulation Folder.""" + + files: Dict[str, File] = {} + "Files stored in the folder." + _files_by_name: Dict[str, File] = {} + "Files by their name as .." + deleted_files: Dict[str, File] = {} + "Files that have been deleted." + + scan_duration: int = -1 + "How many timesteps to complete a scan." + + red_scan_duration: int = -1 + "How many timesteps to complete reveal to red scan." + + def __init__(self, **kwargs): + """ + Initialise Folder class. + + :param name: The name of the folder. + :param sys_log: The SysLog instance to us to create system logs. + """ + super().__init__(**kwargs) + + self.sys_log.info(f"Created file /{self.name} (id: {self.uuid})") + + def _init_request_manager(self) -> RequestManager: + rm = super()._init_request_manager() + rm.add_request( + name="delete", + request_type=RequestType(func=lambda request, context: self.remove_file_by_id(file_uuid=request[0])), + ) + return rm + + def describe_state(self) -> Dict: + """ + Produce a dictionary describing the current state of this object. + + :return: Current state of this object and child objects. + """ + state = super().describe_state() + state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()} + return state + + def show(self, markdown: bool = False): + """ + Display the contents of the Folder in tabular format. + + :param markdown: Whether to display the table in Markdown format or not. Default is `False`. + """ + table = PrettyTable(["File", "Size"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.sys_log.hostname} File System Folder ({self.name})" + for file in self.files.values(): + table.add_row([file.name, file.size_str]) + print(table.get_string(sortby="File")) + + @property + def size(self) -> int: + """ + Calculate and return the total size of all files in the folder. + + :return: The total size of all files in the folder. If no files exist or all have `None` + size, returns 0. + """ + return sum(file.size for file in self.files.values() if file.size is not None) + + def apply_timestep(self, timestep: int): + """ + Apply a single timestep of simulation dynamics to this folder and its files. + + In this instance, if any multi-timestep processes are currently occurring (such as scanning), + then they are brought one step closer to being finished. + + :param timestep: The current timestep number. (Amount of time since simulation episode began) + :type timestep: int + """ + super().apply_timestep(timestep=timestep) + + # scan files each timestep + if self.scan_duration >= 0: + self.scan_duration -= 1 + + if self.scan_duration == 0: + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + file.scan() + if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT: + self.visible_health_status = FileSystemItemHealthStatus.CORRUPT + + # red scan file at each step + if self.red_scan_duration >= 0: + self.red_scan_duration -= 1 + + if self.red_scan_duration == 0: + self.revealed_to_red = True + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + file.reveal_to_red() + + # apply timestep to files in folder + for file_id in self.files: + self.files[file_id].apply_timestep(timestep=timestep) + + def get_file(self, file_name: str) -> Optional[File]: + """ + Get a file by its name. + + File name must be the filename and prefix, like 'memo.docx'. + + :param file_name: The file name. + :return: The matching File. + """ + # TODO: Increment read count? + return self._files_by_name.get(file_name) + + def get_file_by_id(self, file_uuid: str) -> File: + """ + Get a file by its uuid. + + :param: file_uuid: The file uuid. + :return: The matching File. + """ + return self.files.get(file_uuid) + + def add_file(self, file: File): + """ + Adds a file to the folder. + + :param File file: The File object to be added to the folder. + :raises Exception: If the provided `file` parameter is None or not an instance of the + `File` class. + """ + if file is None or not isinstance(file, File): + raise Exception(f"Invalid file: {file}") + + # check if file with id already exists in folder + if file.uuid in self.files: + _LOGGER.debug(f"File with id {file.uuid} already exists in folder") + else: + # add to list + self.files[file.uuid] = file + self._files_by_name[file.name] = file + file.folder = self + + def remove_file(self, file: Optional[File]): + """ + Removes a file from the folder list. + + The method can take a File object or a file id. + + :param file: The file to remove + """ + if file is None or not isinstance(file, File): + raise Exception(f"Invalid file: {file}") + + if self.files.get(file.uuid): + self.files.pop(file.uuid) + self._files_by_name.pop(file.name) + self.deleted_files[file.uuid] = file + self.sys_log.info(f"Removed file {file.name} (id: {file.uuid})") + else: + _LOGGER.debug(f"File with UUID {file.uuid} was not found.") + + def remove_file_by_id(self, file_uuid: str): + """ + Remove a file using id. + + :param: file_uuid: The UUID of the file to remove. + """ + file = self.get_file_by_id(file_uuid=file_uuid) + self.remove_file(file=file) + + def remove_all_files(self): + """Removes all the files in the folder.""" + for file_id in self.files: + self.deleted_files[file_id] = self.files[file_id] + + self.files = {} + self._files_by_name = {} + + def restore_file(self, file: Optional[File]): + """ + Restores a file. + + The method can take a File object or a file id. + + :param file: The file to restore + """ + pass + + def quarantine(self): + """Quarantines the File System Folder.""" + pass + + def unquarantine(self): + """Unquarantine of the File System Folder.""" + pass + + def quarantine_status(self) -> bool: + """Returns true if the folder is being quarantined.""" + pass + + def scan(self, instant_scan: bool = False) -> None: + """ + Update Folder visible status. + + :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. + """ + if instant_scan: + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + file.scan() + if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT: + self.visible_health_status = FileSystemItemHealthStatus.CORRUPT + return + + if self.scan_duration <= 0: + # scan one file per timestep + self.scan_duration = len(self.files) + self.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})") + else: + # scan already in progress + self.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})") + + def reveal_to_red(self, instant_scan: bool = False): + """ + Reveals the folders and files to the red agent. + + :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. + """ + if instant_scan: + self.revealed_to_red = True + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + file.reveal_to_red() + return + + if self.red_scan_duration <= 0: + # scan one file per timestep + self.red_scan_duration = len(self.files) + self.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})") + else: + # scan already in progress + self.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})") + + def check_hash(self) -> bool: + """ + Runs a :func:`check_hash` on all files in the folder. + + If a file under the folder is corrupted, the whole folder is considered corrupted. + + TODO: For now this will just iterate through the files and run :func:`check_hash` and ignores + any other changes to the folder + + Return False if corruption is detected, otherwise True + """ + super().check_hash() + + # iterate through the files and run a check hash + no_corrupted_files = True + + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + no_corrupted_files = file.check_hash() + + # if one file in the folder is corrupted, set the folder status to corrupted + if not no_corrupted_files: + self.corrupt() + + self.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})") + + return no_corrupted_files + + def repair(self) -> bool: + """Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" + super().repair() + + repaired = False + + # iterate through the files in the folder + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + repaired = file.repair() + + # set file status to good if corrupt + if self.health_status == FileSystemItemHealthStatus.CORRUPT: + self.health_status = FileSystemItemHealthStatus.GOOD + repaired = True + + self.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})") + return repaired + + def restore(self) -> bool: + """Restore a File by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" + super().restore() + + restored = False + + # iterate through the files in the folder + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + restored = file.restore() + + # set file status to corrupt if good + if self.health_status == FileSystemItemHealthStatus.CORRUPT: + self.health_status = FileSystemItemHealthStatus.GOOD + restored = True + + self.sys_log.info(f"Restored folder {self.name} (id: {self.uuid})") + return restored + + def corrupt(self) -> bool: + """Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT.""" + super().corrupt() + + corrupted = False + + # iterate through the files in the folder + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + corrupted = file.corrupt() + + # set file status to corrupt if good + if self.health_status == FileSystemItemHealthStatus.GOOD: + self.health_status = FileSystemItemHealthStatus.CORRUPT + corrupted = True + + self.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})") + return corrupted diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py new file mode 100644 index 00000000..174c7726 --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py @@ -0,0 +1,78 @@ +from primaite.simulator.file_system.file import File +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus +from primaite.simulator.file_system.file_type import FileType + + +def test_create_file_no_extension(file_system): + """Tests that creating a file without an extension sets the file type to FileType.UNKNOWN.""" + file = file_system.create_file(file_name="test_file") + assert len(file_system.folders) is 1 + assert file_system.get_folder("root").get_file("test_file") == file + assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN + assert file_system.get_folder("root").get_file("test_file").size == 0 + + +def test_file_scan(file_system): + """Test the ability to update visible status.""" + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert file.health_status == FileSystemItemHealthStatus.GOOD + assert file.visible_health_status == FileSystemItemHealthStatus.GOOD + + file.corrupt() + + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.visible_health_status == FileSystemItemHealthStatus.GOOD + + file.scan() + + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_file_reveal_to_red_scan(file_system): + """Test the ability to reveal files to red.""" + file = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert file.revealed_to_red is False + + file.reveal_to_red() + + assert file.revealed_to_red is True + + +def test_simulated_file_check_hash(file_system): + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert file.check_hash() is True + + # change simulated file size + file.sim_size = 0 + assert file.check_hash() is False + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_real_file_check_hash(file_system): + file: File = file_system.create_file(file_name="test_file.txt", real=True) + + assert file.check_hash() is True + + # change file content + with open(file.sim_path, "a") as f: + f.write("get hacked scrub lol xD\n") + + assert file.check_hash() is False + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_file_corrupt_repair(file_system): + """Test the ability to corrupt and repair files.""" + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + file.corrupt() + + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + file.repair() + + assert file.health_status == FileSystemItemHealthStatus.GOOD diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py index cb398ca9..d26d0a4a 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py @@ -1,6 +1,6 @@ import pytest -from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder +from primaite.simulator.file_system.file_system import FileSystem from primaite.simulator.file_system.file_type import FileType @@ -26,15 +26,6 @@ def test_create_file_no_folder(file_system): assert file_system.get_folder("root").get_file("test_file.txt").size == 10 -def test_create_file_no_extension(file_system): - """Tests that creating a file without an extension sets the file type to FileType.UNKNOWN.""" - file = file_system.create_file(file_name="test_file") - assert len(file_system.folders) is 1 - assert file_system.get_folder("root").get_file("test_file") == file - assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN - assert file_system.get_folder("root").get_file("test_file").size == 0 - - def test_delete_file(file_system): """Tests that a file can be deleted.""" file_system.create_file(file_name="test_file.txt") @@ -125,193 +116,6 @@ def test_copy_file(file_system): assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid -@pytest.mark.skip(reason="Implementation for quarantine not needed yet") -def test_folder_quarantine_state(file_system): - """Tests the changing of folder quarantine status.""" - folder = file_system.get_folder("root") - - assert folder.quarantine_status() is False - - folder.quarantine() - assert folder.quarantine_status() is True - - folder.unquarantine() - assert folder.quarantine_status() is False - - -def test_file_corrupt_repair(file_system): - """Test the ability to corrupt and repair files.""" - folder: Folder = file_system.create_folder(folder_name="test_folder") - file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - - file.corrupt() - - assert folder.health_status == FileSystemItemHealthStatus.GOOD - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - file.repair() - - assert folder.health_status == FileSystemItemHealthStatus.GOOD - assert file.health_status == FileSystemItemHealthStatus.GOOD - - -def test_folder_corrupt_repair(file_system): - """Test the ability to corrupt and repair folders.""" - folder: Folder = file_system.create_folder(folder_name="test_folder") - file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - - folder.corrupt() - - file = folder.get_file(file_name="test_file.txt") - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - folder.repair() - - file = folder.get_file(file_name="test_file.txt") - assert folder.health_status == FileSystemItemHealthStatus.GOOD - assert file.health_status == FileSystemItemHealthStatus.GOOD - - -def test_file_scan(file_system): - """Test the ability to update visible status.""" - folder: Folder = file_system.create_folder(folder_name="test_folder") - file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - - assert file.health_status == FileSystemItemHealthStatus.GOOD - assert file.visible_health_status == FileSystemItemHealthStatus.GOOD - - file.corrupt() - - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - assert file.visible_health_status == FileSystemItemHealthStatus.GOOD - - file.scan() - - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_folder_scan(file_system): - """Test the ability to update visible status.""" - folder: Folder = file_system.create_folder(folder_name="test_folder") - file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - file_system.create_file(file_name="test_file2.txt", folder_name="test_folder") - - file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1]) - file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0]) - - assert folder.health_status == FileSystemItemHealthStatus.GOOD - assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD - - folder.corrupt() - - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD - - folder.scan() - - folder.apply_timestep(timestep=0) - - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD - - folder.apply_timestep(timestep=1) - - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT - assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT - assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_folder_reveal_to_red_scan(file_system): - """Test the ability to reveal files to red.""" - folder: Folder = file_system.create_folder(folder_name="test_folder") - file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - file_system.create_file(file_name="test_file2.txt", folder_name="test_folder") - - file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1]) - file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0]) - - assert folder.revealed_to_red is False - assert file1.revealed_to_red is False - assert file2.revealed_to_red is False - - folder.reveal_to_red() - - folder.apply_timestep(timestep=0) - - assert folder.revealed_to_red is False - assert file1.revealed_to_red is False - assert file2.revealed_to_red is False - - folder.apply_timestep(timestep=1) - - assert folder.revealed_to_red is True - assert file1.revealed_to_red is True - assert file2.revealed_to_red is True - - -def test_simulated_file_check_hash(file_system): - file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - - assert file.check_hash() is True - - # change simulated file size - file.sim_size = 0 - assert file.check_hash() is False - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_real_file_check_hash(file_system): - file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True) - - assert file.check_hash() is True - - # change file content - with open(file.sim_path, "a") as f: - f.write("get hacked scrub lol xD\n") - - assert file.check_hash() is False - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_simulated_folder_check_hash(file_system): - folder: Folder = file_system.create_folder(folder_name="test_folder") - file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - - assert folder.check_hash() is True - - # change simulated file size - file = folder.get_file(file_name="test_file.txt") - file.sim_size = 0 - assert folder.check_hash() is False - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_real_folder_check_hash(file_system): - folder: Folder = file_system.create_folder(folder_name="test_folder") - file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True) - - assert folder.check_hash() is True - - # change simulated file size - file = folder.get_file(file_name="test_file.txt") - - # change file content - with open(file.sim_path, "a") as f: - f.write("get hacked scrub lol xD\n") - - assert folder.check_hash() is False - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - - @pytest.mark.skip(reason="Skipping until we tackle serialisation") def test_serialisation(file_system): """Test to check that the object serialisation works correctly.""" diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py index abfb244a..d902c935 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py @@ -2,7 +2,8 @@ from typing import Tuple import pytest -from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder +from primaite.simulator.file_system.file_system import File, FileSystem, Folder +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus @pytest.fixture(scope="function") diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py new file mode 100644 index 00000000..9a12dbe9 --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py @@ -0,0 +1,133 @@ +import pytest + +from primaite.simulator.file_system.file import File +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus +from primaite.simulator.file_system.folder import Folder + + +@pytest.mark.skip(reason="Implementation for quarantine not needed yet") +def test_folder_quarantine_state(file_system): + """Tests the changing of folder quarantine status.""" + folder = file_system.get_folder("root") + + assert folder.quarantine_status() is False + + folder.quarantine() + assert folder.quarantine_status() is True + + folder.unquarantine() + assert folder.quarantine_status() is False + + +def test_folder_scan(file_system): + """Test the ability to update visible status.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + file_system.create_file(file_name="test_file2.txt", folder_name="test_folder") + + file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1]) + file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0]) + + assert folder.health_status == FileSystemItemHealthStatus.GOOD + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD + + folder.corrupt() + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD + + folder.scan() + + folder.apply_timestep(timestep=0) + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD + + folder.apply_timestep(timestep=1) + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT + assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT + assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_folder_reveal_to_red_scan(file_system): + """Test the ability to reveal files to red.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + file_system.create_file(file_name="test_file2.txt", folder_name="test_folder") + + file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1]) + file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0]) + + assert folder.revealed_to_red is False + assert file1.revealed_to_red is False + assert file2.revealed_to_red is False + + folder.reveal_to_red() + + folder.apply_timestep(timestep=0) + + assert folder.revealed_to_red is False + assert file1.revealed_to_red is False + assert file2.revealed_to_red is False + + folder.apply_timestep(timestep=1) + + assert folder.revealed_to_red is True + assert file1.revealed_to_red is True + assert file2.revealed_to_red is True + + +def test_folder_corrupt_repair(file_system): + """Test the ability to corrupt and repair folders.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + folder.corrupt() + + file = folder.get_file(file_name="test_file.txt") + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + folder.repair() + + file = folder.get_file(file_name="test_file.txt") + assert folder.health_status == FileSystemItemHealthStatus.GOOD + assert file.health_status == FileSystemItemHealthStatus.GOOD + + +def test_simulated_folder_check_hash(file_system): + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert folder.check_hash() is True + + # change simulated file size + file = folder.get_file(file_name="test_file.txt") + file.sim_size = 0 + assert folder.check_hash() is False + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_real_folder_check_hash(file_system): + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True) + + assert folder.check_hash() is True + + # change simulated file size + file = folder.get_file(file_name="test_file.txt") + + # change file content + with open(file.sim_path, "a") as f: + f.write("get hacked scrub lol xD\n") + + assert folder.check_hash() is False + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT diff --git a/tests/unit_tests/_primaite/_simulator/_network/_hardware/test_node_actions.py b/tests/unit_tests/_primaite/_simulator/_network/_hardware/test_node_actions.py index 3d6eea3b..1216a0cc 100644 --- a/tests/unit_tests/_primaite/_simulator/_network/_hardware/test_node_actions.py +++ b/tests/unit_tests/_primaite/_simulator/_network/_hardware/test_node_actions.py @@ -1,6 +1,7 @@ import pytest -from primaite.simulator.file_system.file_system import File, FileSystemItemHealthStatus, Folder +from primaite.simulator.file_system.file_system import File, Folder +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus from primaite.simulator.network.hardware.base import Node, NodeOperatingState from primaite.simulator.system.applications.application import Application from primaite.simulator.system.processes.process import Process