From e70ceec716e498b32bbc39d35579a82214b285a8 Mon Sep 17 00:00:00 2001 From: "Czar.Echavez" Date: Mon, 6 Nov 2023 10:22:08 +0000 Subject: [PATCH] #1962: folder/file restore logic --- src/primaite/simulator/file_system/file.py | 21 +++-- .../simulator/file_system/file_system.py | 78 +++++++++++++++---- src/primaite/simulator/file_system/folder.py | 32 ++++++-- .../_simulator/_file_system/test_file.py | 10 ++- .../_file_system/test_file_system_actions.py | 53 +++++++++++++ 5 files changed, 161 insertions(+), 33 deletions(-) diff --git a/src/primaite/simulator/file_system/file.py b/src/primaite/simulator/file_system/file.py index 04502c0f..d9b02e8e 100644 --- a/src/primaite/simulator/file_system/file.py +++ b/src/primaite/simulator/file_system/file.py @@ -163,14 +163,6 @@ class File(FileSystemItemABC): path = self.folder.name + "/" + self.name self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") - def restore(self) -> None: - """Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" - if self.health_status == FileSystemItemHealthStatus.CORRUPT: - self.health_status = FileSystemItemHealthStatus.GOOD - - path = self.folder.name + "/" + self.name - self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}") - def corrupt(self) -> None: """Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT.""" if self.deleted: @@ -184,10 +176,17 @@ class File(FileSystemItemABC): path = self.folder.name + "/" + self.name self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") - def restore(self) -> bool: + def restore(self) -> None: """Determines if the file needs to be repaired or unmarked as deleted.""" - super().restore() - return True + if self.deleted: + self.deleted = False + return + + if self.health_status == FileSystemItemHealthStatus.CORRUPT: + self.health_status = FileSystemItemHealthStatus.GOOD + + path = self.folder.name + "/" + self.name + self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}") def delete(self): """Marks the file as deleted.""" diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index 7f2d41e5..b1688045 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -49,12 +49,27 @@ class FileSystem(SimComponent): name="folder", request_type=RequestType(func=lambda request, context: self.delete_folder_by_id(folder_uuid=request[0])), ) - rm.add_request( name="delete", request_type=RequestType(func=self._delete_manager), ) + self._restore_manager = RequestManager() + self._restore_manager.add_request( + name="file", + request_type=RequestType( + func=lambda request, context: self.restore_file(folder_uuid=request[0], file_uuid=request[1]) + ), + ) + self._restore_manager.add_request( + name="folder", + request_type=RequestType(func=lambda request, context: self.restore_folder(folder_uuid=request[0])), + ) + rm.add_request( + name="restore", + request_type=RequestType(func=self._restore_manager), + ) + self._folder_request_manager = RequestManager() rm.add_request("folder", RequestType(func=self._folder_request_manager)) @@ -164,13 +179,19 @@ class FileSystem(SimComponent): """ return self._folders_by_name.get(folder_name) - def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]: + def get_folder_by_id(self, folder_uuid: str, include_deleted: bool = False) -> Optional[Folder]: """ Get a folder by its uuid if it exists. :param: folder_uuid: The folder uuid. + :param: include_deleted: If true, the deleted folders will also be checked :return: The matching Folder. """ + if include_deleted: + folder = self.deleted_folders.get(folder_uuid) + if folder: + return folder + return self.folders.get(folder_uuid) ############################################################### @@ -244,7 +265,7 @@ class FileSystem(SimComponent): :param: include_deleted: If true, the deleted files will also be checked :return: An instance of File if it exists, otherwise `None`. """ - folder = self.folders.get(folder_uuid) + folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=include_deleted) if folder: return folder.get_file_by_id(file_uuid=file_uuid, include_deleted=include_deleted) @@ -291,7 +312,11 @@ class FileSystem(SimComponent): if folder: file = folder.get_file_by_id(file_uuid=file_uuid) - self.delete_file(folder_name=folder.name, file_name=file.name) + + if file: + self.delete_file(folder_name=folder.name, file_name=file.name) + else: + self.sys_log.error(f"Unable to delete file that does not exist. (id: {file_uuid})") def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str): """ @@ -387,20 +412,47 @@ class FileSystem(SimComponent): for folder_id in self.folders: self.folders[folder_id].reveal_to_red(instant_scan=instant_scan) - def restore_folder(self, folder_id: str): - """TODO.""" - pass + def restore_folder(self, folder_uuid: str): + """ + Restore a folder. - def restore_file(self, folder_id: str, file_id: str): + Checks the current folder's status and applies the correct fix for the folder. + + :param: folder_uuid: id of the folder to restore + :type: folder_uuid: str + """ + folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=True) + + if folder is None: + self.sys_log.error(f"Unable to restore folder with uuid {folder_uuid}. Folder does not exist.") + return + + folder.restore() + self.folders[folder.uuid] = folder + self._folders_by_name[folder.name] = folder + + if folder.deleted: + self.deleted_folders.pop(folder.uuid) + + def restore_file(self, folder_uuid: str, file_uuid: str): """ Restore a file. Checks the current file's status and applies the correct fix for the file. - :param: folder_id: id of the folder where the file is stored - :type: folder_id: str + :param: folder_uuid: id of the folder where the file is stored + :type: folder_uuid: str - :param: folder_id: id of the file to restore - :type: folder_id: str + :param: file_uuid: id of the file to restore + :type: file_uuid: str """ - pass + folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=True) + + if folder: + file = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True) + + if file is None: + self.sys_log.error(f"Unable to restore file with uuid {file_uuid}. File does not exist.") + return + + folder.restore_file(file_uuid=file_uuid) diff --git a/src/primaite/simulator/file_system/folder.py b/src/primaite/simulator/file_system/folder.py index e12d4e12..f19f4efa 100644 --- a/src/primaite/simulator/file_system/folder.py +++ b/src/primaite/simulator/file_system/folder.py @@ -210,15 +210,24 @@ class Folder(FileSystemItemABC): self.files = {} self._files_by_name = {} - def restore_file(self, file: Optional[File]): + def restore_file(self, file_uuid: str): """ Restores a file. - The method can take a File object or a file id. - - :param file: The file to restore + :param file_uuid: The id of the file to restore """ - pass + # if the file was not deleted, run a repair + file = self.get_file_by_id(file_uuid=file_uuid, include_deleted=True) + if not file: + self.sys_log.error(f"Unable to restore file with uuid {file_uuid}. File does not exist.") + return + + file.restore() + self.files[file.uuid] = file + self._files_by_name[file.name] = file + + if file.deleted: + self.deleted_files.pop(file_uuid) def quarantine(self): """Quarantines the File System Folder.""" @@ -338,7 +347,18 @@ class Folder(FileSystemItemABC): If the folder is deleted, restore the folder by setting deleted status to False. """ - pass + # repair all files + for file_id in self.files: + self.restore_file(file_uuid=file_id) + + deleted_files = self.deleted_files.copy() + for file_id in deleted_files: + self.restore_file(file_uuid=file_id) + + if self.deleted: + self.deleted = False + elif self.health_status == FileSystemItemHealthStatus.CORRUPT: + self.health_status = FileSystemItemHealthStatus.GOOD def corrupt(self) -> None: """Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT.""" diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py index 94ccde83..32efe029 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py @@ -65,14 +65,18 @@ def test_real_file_check_hash(file_system): assert file.health_status == FileSystemItemHealthStatus.CORRUPT -def test_file_corrupt_repair(file_system): +def test_file_corrupt_repair_restore(file_system): """Test the ability to corrupt and repair files.""" file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") file.corrupt() - assert file.health_status == FileSystemItemHealthStatus.CORRUPT file.repair() - + assert file.health_status == FileSystemItemHealthStatus.GOOD + + file.corrupt() + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + file.restore() assert file.health_status == FileSystemItemHealthStatus.GOOD diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py index d6bbd285..81616420 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py @@ -33,3 +33,56 @@ def test_folder_delete_request(populated_file_system): fs.apply_request(request=["delete", "folder", folder.uuid]) assert fs.get_folder_by_id(folder_uuid=folder.uuid) is None assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is None + + +def test_file_restore_request(populated_file_system): + """Test that an agent can request that a file can be restored.""" + fs, folder, file = populated_file_system + assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is not None + + fs.apply_request(request=["delete", "file", folder.uuid, file.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name) is None + assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True + + fs.apply_request(request=["restore", "file", folder.uuid, file.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None + assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False + + fs.apply_request(request=["file", file.uuid, "corrupt"]) + assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT + + fs.apply_request(request=["restore", "file", folder.uuid, file.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.GOOD + + +def test_folder_restore_request(populated_file_system): + """Test that an agent can request that a folder can be restored.""" + fs, folder, file = populated_file_system + assert fs.get_folder_by_id(folder_uuid=folder.uuid) is not None + assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is not None + + # delete folder + fs.apply_request(request=["delete", "folder", folder.uuid]) + assert fs.get_folder(folder_name=folder.name) is None + assert fs.get_folder_by_id(folder_uuid=folder.uuid, include_deleted=True).deleted is True + + assert fs.get_file(folder_name=folder.name, file_name=file.name) is None + assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True + + # restore folder + fs.apply_request(request=["restore", "folder", folder.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None + assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False + + assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None + assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is False + + # corrupt folder + fs.apply_request(request=["folder", folder.uuid, "corrupt"]) + assert fs.get_folder(folder_name=folder.name).health_status == FileSystemItemHealthStatus.CORRUPT + assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT + + # restore folder + fs.apply_request(request=["restore", "folder", folder.uuid]) + assert fs.get_folder(folder_name=folder.name).health_status == FileSystemItemHealthStatus.GOOD + assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.GOOD