#1962: folder/file restore logic

This commit is contained in:
Czar.Echavez
2023-11-06 10:22:08 +00:00
parent 51713bad74
commit e70ceec716
5 changed files with 161 additions and 33 deletions

View File

@@ -163,14 +163,6 @@ class File(FileSystemItemABC):
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
def restore(self) -> None:
"""Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
def corrupt(self) -> None:
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT."""
if self.deleted:
@@ -184,10 +176,17 @@ class File(FileSystemItemABC):
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
def restore(self) -> bool:
def restore(self) -> None:
"""Determines if the file needs to be repaired or unmarked as deleted."""
super().restore()
return True
if self.deleted:
self.deleted = False
return
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
def delete(self):
"""Marks the file as deleted."""

View File

@@ -49,12 +49,27 @@ class FileSystem(SimComponent):
name="folder",
request_type=RequestType(func=lambda request, context: self.delete_folder_by_id(folder_uuid=request[0])),
)
rm.add_request(
name="delete",
request_type=RequestType(func=self._delete_manager),
)
self._restore_manager = RequestManager()
self._restore_manager.add_request(
name="file",
request_type=RequestType(
func=lambda request, context: self.restore_file(folder_uuid=request[0], file_uuid=request[1])
),
)
self._restore_manager.add_request(
name="folder",
request_type=RequestType(func=lambda request, context: self.restore_folder(folder_uuid=request[0])),
)
rm.add_request(
name="restore",
request_type=RequestType(func=self._restore_manager),
)
self._folder_request_manager = RequestManager()
rm.add_request("folder", RequestType(func=self._folder_request_manager))
@@ -164,13 +179,19 @@ class FileSystem(SimComponent):
"""
return self._folders_by_name.get(folder_name)
def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]:
def get_folder_by_id(self, folder_uuid: str, include_deleted: bool = False) -> Optional[Folder]:
"""
Get a folder by its uuid if it exists.
:param: folder_uuid: The folder uuid.
:param: include_deleted: If true, the deleted folders will also be checked
:return: The matching Folder.
"""
if include_deleted:
folder = self.deleted_folders.get(folder_uuid)
if folder:
return folder
return self.folders.get(folder_uuid)
###############################################################
@@ -244,7 +265,7 @@ class FileSystem(SimComponent):
:param: include_deleted: If true, the deleted files will also be checked
:return: An instance of File if it exists, otherwise `None`.
"""
folder = self.folders.get(folder_uuid)
folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=include_deleted)
if folder:
return folder.get_file_by_id(file_uuid=file_uuid, include_deleted=include_deleted)
@@ -291,7 +312,11 @@ class FileSystem(SimComponent):
if folder:
file = folder.get_file_by_id(file_uuid=file_uuid)
self.delete_file(folder_name=folder.name, file_name=file.name)
if file:
self.delete_file(folder_name=folder.name, file_name=file.name)
else:
self.sys_log.error(f"Unable to delete file that does not exist. (id: {file_uuid})")
def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str):
"""
@@ -387,20 +412,47 @@ class FileSystem(SimComponent):
for folder_id in self.folders:
self.folders[folder_id].reveal_to_red(instant_scan=instant_scan)
def restore_folder(self, folder_id: str):
"""TODO."""
pass
def restore_folder(self, folder_uuid: str):
"""
Restore a folder.
def restore_file(self, folder_id: str, file_id: str):
Checks the current folder's status and applies the correct fix for the folder.
:param: folder_uuid: id of the folder to restore
:type: folder_uuid: str
"""
folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=True)
if folder is None:
self.sys_log.error(f"Unable to restore folder with uuid {folder_uuid}. Folder does not exist.")
return
folder.restore()
self.folders[folder.uuid] = folder
self._folders_by_name[folder.name] = folder
if folder.deleted:
self.deleted_folders.pop(folder.uuid)
def restore_file(self, folder_uuid: str, file_uuid: str):
"""
Restore a file.
Checks the current file's status and applies the correct fix for the file.
:param: folder_id: id of the folder where the file is stored
:type: folder_id: str
:param: folder_uuid: id of the folder where the file is stored
:type: folder_uuid: str
:param: folder_id: id of the file to restore
:type: folder_id: str
:param: file_uuid: id of the file to restore
:type: file_uuid: str
"""
pass
folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=True)
if folder:
file = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
if file is None:
self.sys_log.error(f"Unable to restore file with uuid {file_uuid}. File does not exist.")
return
folder.restore_file(file_uuid=file_uuid)

View File

@@ -210,15 +210,24 @@ class Folder(FileSystemItemABC):
self.files = {}
self._files_by_name = {}
def restore_file(self, file: Optional[File]):
def restore_file(self, file_uuid: str):
"""
Restores a file.
The method can take a File object or a file id.
:param file: The file to restore
:param file_uuid: The id of the file to restore
"""
pass
# if the file was not deleted, run a repair
file = self.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
if not file:
self.sys_log.error(f"Unable to restore file with uuid {file_uuid}. File does not exist.")
return
file.restore()
self.files[file.uuid] = file
self._files_by_name[file.name] = file
if file.deleted:
self.deleted_files.pop(file_uuid)
def quarantine(self):
"""Quarantines the File System Folder."""
@@ -338,7 +347,18 @@ class Folder(FileSystemItemABC):
If the folder is deleted, restore the folder by setting deleted status to False.
"""
pass
# repair all files
for file_id in self.files:
self.restore_file(file_uuid=file_id)
deleted_files = self.deleted_files.copy()
for file_id in deleted_files:
self.restore_file(file_uuid=file_id)
if self.deleted:
self.deleted = False
elif self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
def corrupt(self) -> None:
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT."""

View File

@@ -65,14 +65,18 @@ def test_real_file_check_hash(file_system):
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_corrupt_repair(file_system):
def test_file_corrupt_repair_restore(file_system):
"""Test the ability to corrupt and repair files."""
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
file.repair()
assert file.health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
file.restore()
assert file.health_status == FileSystemItemHealthStatus.GOOD

View File

@@ -33,3 +33,56 @@ def test_folder_delete_request(populated_file_system):
fs.apply_request(request=["delete", "folder", folder.uuid])
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is None
def test_file_restore_request(populated_file_system):
"""Test that an agent can request that a file can be restored."""
fs, folder, file = populated_file_system
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is not None
fs.apply_request(request=["delete", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True
fs.apply_request(request=["restore", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["restore", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.GOOD
def test_folder_restore_request(populated_file_system):
"""Test that an agent can request that a folder can be restored."""
fs, folder, file = populated_file_system
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is not None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is not None
# delete folder
fs.apply_request(request=["delete", "folder", folder.uuid])
assert fs.get_folder(folder_name=folder.name) is None
assert fs.get_folder_by_id(folder_uuid=folder.uuid, include_deleted=True).deleted is True
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True
# restore folder
fs.apply_request(request=["restore", "folder", folder.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is False
# corrupt folder
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
assert fs.get_folder(folder_name=folder.name).health_status == FileSystemItemHealthStatus.CORRUPT
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
# restore folder
fs.apply_request(request=["restore", "folder", folder.uuid])
assert fs.get_folder(folder_name=folder.name).health_status == FileSystemItemHealthStatus.GOOD
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.GOOD