diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index a821ae40..24abbf18 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -58,6 +58,9 @@ class FileSystemItemStatus(Enum): CORRUPTED = 2 """File/Folder is corrupted.""" + DELETED = 3 + """File/Folder is deleted.""" + class FileSystemItemABC(SimComponent): """ @@ -85,11 +88,10 @@ class FileSystemItemABC(SimComponent): :return: Current state of this object and child objects. """ state = super().describe_state() - state.update( - { - "name": self.name, - } - ) + state["name"] = self.name + state["status"] = self.status.name + state["visible_status"] = self.visible_status.name + state["previous_hash"] = self.previous_hash return state @property @@ -104,16 +106,6 @@ class FileSystemItemABC(SimComponent): """ return convert_size(self.size) - def _init_request_manager(self) -> RequestManager: - rm = super()._init_request_manager() - rm.add_request("scan", RequestType(func=lambda request, context: self.scan())) - rm.add_request("checkhash", RequestType(func=lambda request, context: self.checkhash())) - rm.add_request("delete", RequestType(func=lambda request, context: self.delete())) - rm.add_request("restore", RequestType(func=lambda request, context: self.restore())) - rm.add_request("repair", RequestType(func=lambda request, context: self.repair())) - rm.add_request("corrupt", RequestType(func=lambda request, context: self.corrupt())) - return rm - def scan(self) -> None: """Update the FileSystemItem states.""" super().scan() @@ -129,16 +121,42 @@ class FileSystemItemABC(SimComponent): Return False if corruption is detected, otherwise True """ - pass + # cannot check hash if deleted + if self.status == FileSystemItemStatus.DELETED: + return False @abstractmethod - def repair(self) -> None: - """Repair the FileSystemItem.""" - pass + def repair(self) -> bool: + """ + Repair the FileSystemItem. + + True if successfully repaired. False otherwise. + """ + # cannot repair if deleted + if self.status == FileSystemItemStatus.DELETED: + return False @abstractmethod - def corrupt(self) -> None: - """Corrupt the FileSystemItem.""" + def corrupt(self) -> bool: + """ + Corrupt the FileSystemItem. + + True if successfully corrupted. False otherwise. + """ + # cannot corrupt if deleted + if self.status == FileSystemItemStatus.DELETED: + return False + + def delete(self) -> None: + """ + Delete the FileSystemItem. + + True if successfully deleted. False otherwise. + """ + self.status = FileSystemItemStatus.DELETED + + def restore(self) -> None: + """Restore the file/folder to the state before it got ruined.""" pass @@ -151,6 +169,8 @@ class FileSystem(SimComponent): sys_log: SysLog sim_root: Path + deleted_folders: Dict[str, Folder] = {} + def __init__(self, **kwargs): super().__init__(**kwargs) # Ensure a default root folder @@ -160,14 +180,110 @@ class FileSystem(SimComponent): def _init_request_manager(self) -> RequestManager: rm = super()._init_request_manager() - self._folder_request_manager = RequestManager() + self._folder_request_manager = self._init_folder_request_manager() rm.add_request("folder", RequestType(func=self._folder_request_manager)) - self._file_request_manager = RequestManager() + self._file_request_manager = self._init_file_request_manager() rm.add_request("file", RequestType(func=self._file_request_manager)) return rm + def _init_folder_request_manager(self) -> RequestManager: + rm = RequestManager() + + rm.add_request( + "scan", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True).scan() + ), + ) + + rm.add_request( + "checkhash", + RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).check_hash()), + ) + + rm.add_request( + "repair", RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).repair()) + ) + + rm.add_request( + "corrupt", + RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).corrupt()), + ) + + rm.add_request( + "delete", RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).delete()) + ) + + rm.add_request( + "restore", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True).restore() + ), + ) + + return rm + + def _init_file_request_manager(self) -> RequestManager: + rm = RequestManager() + + rm.add_request( + "scan", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True) + .get_file_by_id(file_uuid=request[1], show_deleted=True) + .scan() + ), + ) + + rm.add_request( + "checkhash", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]) + .get_file_by_id(file_uuid=request[1]) + .check_hash() + ), + ) + + rm.add_request( + "repair", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]) + .get_file_by_id(file_uuid=request[1]) + .repair() + ), + ) + + rm.add_request( + "corrupt", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]) + .get_file_by_id(file_uuid=request[1]) + .corrupt() + ), + ) + + rm.add_request( + "delete", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]) + .get_file_by_id(file_uuid=request[1]) + .delete() + ), + ) + + rm.add_request( + "restore", + RequestType( + func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True) + .get_file_by_id(file_uuid=request[1], show_deleted=True) + .restore() + ), + ) + + return rm + @property def size(self) -> int: """ @@ -243,7 +359,12 @@ class FileSystem(SimComponent): folder = self._folders_by_name.get(folder_name) if folder: for file in folder.files.values(): - self.delete_file(file) + self.delete_file(folder_name=folder_name, file_name=file.name) + # add to deleted list + folder.delete() + self.deleted_folders[folder.uuid] = folder + + # remove from normal list self.folders.pop(folder.uuid) self._folders_by_name.pop(folder.name) self.sys_log.info(f"Deleted folder /{folder.name} and its contents") @@ -251,6 +372,10 @@ class FileSystem(SimComponent): else: _LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}") + def restore_folder(self, folder_id: str): + """TODO.""" + pass + def create_file( self, file_name: str, @@ -364,6 +489,24 @@ class FileSystem(SimComponent): new_file.sim_path.parent.mkdir(exist_ok=True) shutil.copy2(file.sim_path, new_file.sim_path) + def restore_file(self, folder_id: str, file_id: str) -> bool: + """ + Restore a file. + + Checks the current file's status and applies the correct fix for the file. + + :param: folder_id: id of the folder where the file is stored + :type: folder_id: str + + :param: folder_id: id of the file to restore + :type: folder_id: str + """ + folder = self.get_folder_by_id(folder_uuid=folder_id, show_deleted=True) + + if folder: + file = folder.get_file_by_id(file_uuid=file_id, show_deleted=True) + return file.restore() + def get_folder(self, folder_name: str) -> Optional[Folder]: """ Get a folder by its name if it exists. @@ -373,13 +516,19 @@ class FileSystem(SimComponent): """ return self._folders_by_name.get(folder_name) - def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]: + def get_folder_by_id(self, folder_uuid: str, show_deleted: bool = False) -> Optional[Folder]: """ Get a folder by its uuid if it exists. - :param folder_uuid: The folder uuid. + :param: folder_uuid: The folder uuid. + :param: show_deleted: show deleted folders :return: The matching Folder. """ + deleted_folder = self.deleted_folders.get(folder_uuid) + + if show_deleted and deleted_folder: + return deleted_folder + return self.folders.get(folder_uuid) @@ -393,6 +542,8 @@ class Folder(FileSystemItemABC): _files_by_name: Dict[str, File] = {} "Files by their name as .." + deleted_files: Dict[str, File] = {} + def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. @@ -401,7 +552,6 @@ class Folder(FileSystemItemABC): """ state = super().describe_state() state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()} - state["is_quarantined"] = self.is_quarantined return state def show(self, markdown: bool = False): @@ -441,13 +591,19 @@ class Folder(FileSystemItemABC): # TODO: Increment read count? return self._files_by_name.get(file_name) - def get_file_by_id(self, file_uuid: str) -> File: + def get_file_by_id(self, file_uuid: str, show_deleted: bool = False) -> File: """ Get a file by its uuid. - :param file_uuid: The file uuid. + :param: file_uuid: The file uuid. + :param: show_deleted: show deleted files :return: The matching File. """ + deleted_file = self.deleted_files.get(file_uuid) + + if show_deleted and deleted_file: + return deleted_file + return self.files.get(file_uuid) def add_file(self, file: File): @@ -482,6 +638,11 @@ class Folder(FileSystemItemABC): raise Exception(f"Invalid file: {file}") if self.files.get(file.uuid): + # add to deleted list + file.delete() + self.deleted_files[file.uuid] = file + + # remove from normal file list self.files.pop(file.uuid) self._files_by_name.pop(file.name) else: @@ -503,35 +664,16 @@ class Folder(FileSystemItemABC): """Returns true if the folder is being quarantined.""" return self.status == FileSystemItemStatus.QUARANTINED - def repair(self) -> None: - """Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" - super().repair() + def scan(self) -> None: + """Update Folder visible status.""" + super().scan() - # iterate through the files in the folder + # update the status of files in folder for file_id in self.files: file = self.get_file_by_id(file_uuid=file_id) - file.repair() + file.scan() - # set file status to good if corrupt - if self.status == FileSystemItemStatus.CORRUPTED: - self.status = FileSystemItemStatus.GOOD - - self.fs.sys_log.info(f"Repaired folder {self.name}") - - def corrupt(self) -> None: - """Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPTED.""" - super().corrupt() - - # iterate through the files in the folder - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - file.corrupt() - - # set file status to good if corrupt - if self.status == FileSystemItemStatus.GOOD: - self.status = FileSystemItemStatus.CORRUPTED - - self.fs.sys_log.info(f"Corrupted folder {self.name}") + self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})") def check_hash(self) -> bool: """ @@ -544,6 +686,8 @@ class Folder(FileSystemItemABC): Return False if corruption is detected, otherwise True """ + super().check_hash() + # iterate through the files and run a check hash no_corrupted_files = True @@ -555,8 +699,57 @@ class Folder(FileSystemItemABC): if not no_corrupted_files: self.corrupt() + self.fs.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})") + return no_corrupted_files + def repair(self) -> bool: + """Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" + super().repair() + + repaired = False + + # iterate through the files in the folder + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + repaired = file.repair() + + # set file status to good if corrupt + if self.status == FileSystemItemStatus.CORRUPTED: + self.status = FileSystemItemStatus.GOOD + repaired = True + + self.fs.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})") + return repaired + + def restore(self) -> None: + """TODO.""" + pass + + def delete(self) -> bool: + """TODO.""" + super().delete() + self.fs.sys_log.info(f"Deleted folder {self.name} (id: {self.uuid})") + + def corrupt(self) -> bool: + """Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPTED.""" + super().corrupt() + + corrupted = False + + # iterate through the files in the folder + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + corrupted = file.corrupt() + + # set file status to good if corrupt + if self.status == FileSystemItemStatus.GOOD: + self.status = FileSystemItemStatus.CORRUPTED + corrupted = True + + self.fs.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})") + return corrupted + class File(FileSystemItemABC): """ @@ -648,27 +841,12 @@ class File(FileSystemItemABC): state["file_type"] = self.file_type.name return state - def repair(self) -> None: - """Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" - super().repair() - - # set file status to good if corrupt - if self.status == FileSystemItemStatus.CORRUPTED: - self.status = FileSystemItemStatus.GOOD + def scan(self) -> None: + """TODO.""" + super().scan() path = self.folder.name + "/" + self.name - self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") - - def corrupt(self) -> None: - """Corrupt a File by setting the status to FileSystemItemStatus.CORRUPTED.""" - super().corrupt() - - # set file status to good if corrupt - if self.status == FileSystemItemStatus.GOOD: - self.status = FileSystemItemStatus.CORRUPTED - - path = self.folder.name + "/" + self.name - self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") + self.folder.fs.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}") def check_hash(self) -> bool: """ @@ -702,3 +880,48 @@ class File(FileSystemItemABC): return False return True + + def delete(self) -> None: + """TODO.""" + super().delete() + + path = self.folder.name + "/" + self.name + self.folder.fs.sys_log.info(f"Deleting file {self.sim_path if self.sim_path else path}") + + def repair(self) -> bool: + """Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" + super().repair() + + # set file status to good if corrupt + if self.status == FileSystemItemStatus.CORRUPTED: + self.status = FileSystemItemStatus.GOOD + + path = self.folder.name + "/" + self.name + self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") + return True + + def restore(self) -> None: + """Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" + super().restore() + + # set file status to good if deleted or corrupt + if self.status in [FileSystemItemStatus.CORRUPTED, FileSystemItemStatus.DELETED]: + self.status = FileSystemItemStatus.GOOD + + path = self.folder.name + "/" + self.name + self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") + + def corrupt(self) -> bool: + """Corrupt a File by setting the status to FileSystemItemStatus.CORRUPTED.""" + super().corrupt() + + corrupted = False + + # set file status to good if corrupt + if self.status == FileSystemItemStatus.GOOD: + self.status = FileSystemItemStatus.CORRUPTED + corrupted = True + + path = self.folder.name + "/" + self.name + self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") + return corrupted diff --git a/tests/integration_tests/system/test_database_on_node.py b/tests/integration_tests/system/test_database_on_node.py index 92056981..14b579b2 100644 --- a/tests/integration_tests/system/test_database_on_node.py +++ b/tests/integration_tests/system/test_database_on_node.py @@ -1,5 +1,6 @@ from ipaddress import IPv4Address +from primaite.simulator.file_system.file_system import FileSystemItemStatus from primaite.simulator.network.hardware.nodes.server import Server from primaite.simulator.system.applications.database_client import DatabaseClient from primaite.simulator.system.services.database.database_service import DatabaseService diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py index 5bb4ceda..4c5d5510 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py @@ -44,6 +44,7 @@ def test_delete_file(file_system): file_system.delete_file(folder_name="root", file_name="test_file.txt") assert len(file_system.folders) == 1 assert len(file_system.get_folder("root").files) == 0 + assert len(file_system.get_folder("root").deleted_files) == 1 def test_delete_non_existent_file(file_system): @@ -69,6 +70,7 @@ def test_delete_folder(file_system): file_system.delete_folder(folder_name="test_folder") assert len(file_system.folders) == 1 + assert len(file_system.deleted_folders) == 1 def test_deleting_a_non_existent_folder(file_system): @@ -95,11 +97,13 @@ def test_move_file(file_system): original_uuid = file.uuid assert len(file_system.get_folder("src_folder").files) == 1 + assert len(file_system.get_folder("src_folder").deleted_files) == 0 assert len(file_system.get_folder("dst_folder").files) == 0 file_system.move_file(src_folder_name="src_folder", src_file_name="test_file.txt", dst_folder_name="dst_folder") assert len(file_system.get_folder("src_folder").files) == 0 + assert len(file_system.get_folder("src_folder").deleted_files) == 1 assert len(file_system.get_folder("dst_folder").files) == 1 assert file_system.get_file("dst_folder", "test_file.txt").uuid == original_uuid @@ -169,6 +173,73 @@ def test_folder_corrupt_repair(file_system): assert file.status == FileSystemItemStatus.GOOD +def test_file_scan(file_system): + """Test the ability to update visible status.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert file.status == FileSystemItemStatus.GOOD + assert file.visible_status == FileSystemItemStatus.GOOD + + file.corrupt() + + assert file.status == FileSystemItemStatus.CORRUPTED + assert file.visible_status == FileSystemItemStatus.GOOD + + file.scan() + + assert file.status == FileSystemItemStatus.CORRUPTED + assert file.visible_status == FileSystemItemStatus.CORRUPTED + + +def test_folder_scan(file_system): + """Test the ability to update visible status.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert folder.status == FileSystemItemStatus.GOOD + assert folder.visible_status == FileSystemItemStatus.GOOD + assert file.status == FileSystemItemStatus.GOOD + assert file.visible_status == FileSystemItemStatus.GOOD + + folder.corrupt() + + assert folder.status == FileSystemItemStatus.CORRUPTED + assert folder.visible_status == FileSystemItemStatus.GOOD + assert file.status == FileSystemItemStatus.CORRUPTED + assert file.visible_status == FileSystemItemStatus.GOOD + + folder.scan() + + assert folder.status == FileSystemItemStatus.CORRUPTED + assert folder.visible_status == FileSystemItemStatus.CORRUPTED + assert file.status == FileSystemItemStatus.CORRUPTED + assert file.visible_status == FileSystemItemStatus.CORRUPTED + + +def test_file_delete_restore(file_system): + """Test the ability to delete and restore a file.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert file.status == FileSystemItemStatus.GOOD + assert file.visible_status == FileSystemItemStatus.GOOD + + file_system.delete_file(folder_name=folder.name, file_name=file.name) + + assert folder.get_file(file_name=file.name) is None + assert folder.get_file_by_id(file_uuid=file.uuid, show_deleted=True).status == FileSystemItemStatus.DELETED + + file_system.restore_file(folder_id=folder.uuid, file_id=file.uuid) + + assert file.status == FileSystemItemStatus.GOOD + assert folder.get_file(file_name=file.name) is not None + + +def test_folder_delete_restore(file_system): + pass + + def test_simulated_file_check_hash(file_system): file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")