#1947: temp commit what is done so far
This commit is contained in:
@@ -58,6 +58,9 @@ class FileSystemItemStatus(Enum):
|
||||
CORRUPTED = 2
|
||||
"""File/Folder is corrupted."""
|
||||
|
||||
DELETED = 3
|
||||
"""File/Folder is deleted."""
|
||||
|
||||
|
||||
class FileSystemItemABC(SimComponent):
|
||||
"""
|
||||
@@ -85,11 +88,10 @@ class FileSystemItemABC(SimComponent):
|
||||
:return: Current state of this object and child objects.
|
||||
"""
|
||||
state = super().describe_state()
|
||||
state.update(
|
||||
{
|
||||
"name": self.name,
|
||||
}
|
||||
)
|
||||
state["name"] = self.name
|
||||
state["status"] = self.status.name
|
||||
state["visible_status"] = self.visible_status.name
|
||||
state["previous_hash"] = self.previous_hash
|
||||
return state
|
||||
|
||||
@property
|
||||
@@ -104,16 +106,6 @@ class FileSystemItemABC(SimComponent):
|
||||
"""
|
||||
return convert_size(self.size)
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
rm = super()._init_request_manager()
|
||||
rm.add_request("scan", RequestType(func=lambda request, context: self.scan()))
|
||||
rm.add_request("checkhash", RequestType(func=lambda request, context: self.checkhash()))
|
||||
rm.add_request("delete", RequestType(func=lambda request, context: self.delete()))
|
||||
rm.add_request("restore", RequestType(func=lambda request, context: self.restore()))
|
||||
rm.add_request("repair", RequestType(func=lambda request, context: self.repair()))
|
||||
rm.add_request("corrupt", RequestType(func=lambda request, context: self.corrupt()))
|
||||
return rm
|
||||
|
||||
def scan(self) -> None:
|
||||
"""Update the FileSystemItem states."""
|
||||
super().scan()
|
||||
@@ -129,16 +121,42 @@ class FileSystemItemABC(SimComponent):
|
||||
|
||||
Return False if corruption is detected, otherwise True
|
||||
"""
|
||||
pass
|
||||
# cannot check hash if deleted
|
||||
if self.status == FileSystemItemStatus.DELETED:
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def repair(self) -> None:
|
||||
"""Repair the FileSystemItem."""
|
||||
pass
|
||||
def repair(self) -> bool:
|
||||
"""
|
||||
Repair the FileSystemItem.
|
||||
|
||||
True if successfully repaired. False otherwise.
|
||||
"""
|
||||
# cannot repair if deleted
|
||||
if self.status == FileSystemItemStatus.DELETED:
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def corrupt(self) -> None:
|
||||
"""Corrupt the FileSystemItem."""
|
||||
def corrupt(self) -> bool:
|
||||
"""
|
||||
Corrupt the FileSystemItem.
|
||||
|
||||
True if successfully corrupted. False otherwise.
|
||||
"""
|
||||
# cannot corrupt if deleted
|
||||
if self.status == FileSystemItemStatus.DELETED:
|
||||
return False
|
||||
|
||||
def delete(self) -> None:
|
||||
"""
|
||||
Delete the FileSystemItem.
|
||||
|
||||
True if successfully deleted. False otherwise.
|
||||
"""
|
||||
self.status = FileSystemItemStatus.DELETED
|
||||
|
||||
def restore(self) -> None:
|
||||
"""Restore the file/folder to the state before it got ruined."""
|
||||
pass
|
||||
|
||||
|
||||
@@ -151,6 +169,8 @@ class FileSystem(SimComponent):
|
||||
sys_log: SysLog
|
||||
sim_root: Path
|
||||
|
||||
deleted_folders: Dict[str, Folder] = {}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
# Ensure a default root folder
|
||||
@@ -160,14 +180,110 @@ class FileSystem(SimComponent):
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
rm = super()._init_request_manager()
|
||||
|
||||
self._folder_request_manager = RequestManager()
|
||||
self._folder_request_manager = self._init_folder_request_manager()
|
||||
rm.add_request("folder", RequestType(func=self._folder_request_manager))
|
||||
|
||||
self._file_request_manager = RequestManager()
|
||||
self._file_request_manager = self._init_file_request_manager()
|
||||
rm.add_request("file", RequestType(func=self._file_request_manager))
|
||||
|
||||
return rm
|
||||
|
||||
def _init_folder_request_manager(self) -> RequestManager:
|
||||
rm = RequestManager()
|
||||
|
||||
rm.add_request(
|
||||
"scan",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True).scan()
|
||||
),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"checkhash",
|
||||
RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).check_hash()),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"repair", RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).repair())
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"corrupt",
|
||||
RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).corrupt()),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"delete", RequestType(func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0]).delete())
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"restore",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True).restore()
|
||||
),
|
||||
)
|
||||
|
||||
return rm
|
||||
|
||||
def _init_file_request_manager(self) -> RequestManager:
|
||||
rm = RequestManager()
|
||||
|
||||
rm.add_request(
|
||||
"scan",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True)
|
||||
.get_file_by_id(file_uuid=request[1], show_deleted=True)
|
||||
.scan()
|
||||
),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"checkhash",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0])
|
||||
.get_file_by_id(file_uuid=request[1])
|
||||
.check_hash()
|
||||
),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"repair",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0])
|
||||
.get_file_by_id(file_uuid=request[1])
|
||||
.repair()
|
||||
),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"corrupt",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0])
|
||||
.get_file_by_id(file_uuid=request[1])
|
||||
.corrupt()
|
||||
),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"delete",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0])
|
||||
.get_file_by_id(file_uuid=request[1])
|
||||
.delete()
|
||||
),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
"restore",
|
||||
RequestType(
|
||||
func=lambda request, context: self.get_folder_by_id(folder_uuid=request[0], show_deleted=True)
|
||||
.get_file_by_id(file_uuid=request[1], show_deleted=True)
|
||||
.restore()
|
||||
),
|
||||
)
|
||||
|
||||
return rm
|
||||
|
||||
@property
|
||||
def size(self) -> int:
|
||||
"""
|
||||
@@ -243,7 +359,12 @@ class FileSystem(SimComponent):
|
||||
folder = self._folders_by_name.get(folder_name)
|
||||
if folder:
|
||||
for file in folder.files.values():
|
||||
self.delete_file(file)
|
||||
self.delete_file(folder_name=folder_name, file_name=file.name)
|
||||
# add to deleted list
|
||||
folder.delete()
|
||||
self.deleted_folders[folder.uuid] = folder
|
||||
|
||||
# remove from normal list
|
||||
self.folders.pop(folder.uuid)
|
||||
self._folders_by_name.pop(folder.name)
|
||||
self.sys_log.info(f"Deleted folder /{folder.name} and its contents")
|
||||
@@ -251,6 +372,10 @@ class FileSystem(SimComponent):
|
||||
else:
|
||||
_LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}")
|
||||
|
||||
def restore_folder(self, folder_id: str):
|
||||
"""TODO."""
|
||||
pass
|
||||
|
||||
def create_file(
|
||||
self,
|
||||
file_name: str,
|
||||
@@ -364,6 +489,24 @@ class FileSystem(SimComponent):
|
||||
new_file.sim_path.parent.mkdir(exist_ok=True)
|
||||
shutil.copy2(file.sim_path, new_file.sim_path)
|
||||
|
||||
def restore_file(self, folder_id: str, file_id: str) -> bool:
|
||||
"""
|
||||
Restore a file.
|
||||
|
||||
Checks the current file's status and applies the correct fix for the file.
|
||||
|
||||
:param: folder_id: id of the folder where the file is stored
|
||||
:type: folder_id: str
|
||||
|
||||
:param: folder_id: id of the file to restore
|
||||
:type: folder_id: str
|
||||
"""
|
||||
folder = self.get_folder_by_id(folder_uuid=folder_id, show_deleted=True)
|
||||
|
||||
if folder:
|
||||
file = folder.get_file_by_id(file_uuid=file_id, show_deleted=True)
|
||||
return file.restore()
|
||||
|
||||
def get_folder(self, folder_name: str) -> Optional[Folder]:
|
||||
"""
|
||||
Get a folder by its name if it exists.
|
||||
@@ -373,13 +516,19 @@ class FileSystem(SimComponent):
|
||||
"""
|
||||
return self._folders_by_name.get(folder_name)
|
||||
|
||||
def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]:
|
||||
def get_folder_by_id(self, folder_uuid: str, show_deleted: bool = False) -> Optional[Folder]:
|
||||
"""
|
||||
Get a folder by its uuid if it exists.
|
||||
|
||||
:param folder_uuid: The folder uuid.
|
||||
:param: folder_uuid: The folder uuid.
|
||||
:param: show_deleted: show deleted folders
|
||||
:return: The matching Folder.
|
||||
"""
|
||||
deleted_folder = self.deleted_folders.get(folder_uuid)
|
||||
|
||||
if show_deleted and deleted_folder:
|
||||
return deleted_folder
|
||||
|
||||
return self.folders.get(folder_uuid)
|
||||
|
||||
|
||||
@@ -393,6 +542,8 @@ class Folder(FileSystemItemABC):
|
||||
_files_by_name: Dict[str, File] = {}
|
||||
"Files by their name as <file name>.<file type>."
|
||||
|
||||
deleted_files: Dict[str, File] = {}
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -401,7 +552,6 @@ class Folder(FileSystemItemABC):
|
||||
"""
|
||||
state = super().describe_state()
|
||||
state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()}
|
||||
state["is_quarantined"] = self.is_quarantined
|
||||
return state
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
@@ -441,13 +591,19 @@ class Folder(FileSystemItemABC):
|
||||
# TODO: Increment read count?
|
||||
return self._files_by_name.get(file_name)
|
||||
|
||||
def get_file_by_id(self, file_uuid: str) -> File:
|
||||
def get_file_by_id(self, file_uuid: str, show_deleted: bool = False) -> File:
|
||||
"""
|
||||
Get a file by its uuid.
|
||||
|
||||
:param file_uuid: The file uuid.
|
||||
:param: file_uuid: The file uuid.
|
||||
:param: show_deleted: show deleted files
|
||||
:return: The matching File.
|
||||
"""
|
||||
deleted_file = self.deleted_files.get(file_uuid)
|
||||
|
||||
if show_deleted and deleted_file:
|
||||
return deleted_file
|
||||
|
||||
return self.files.get(file_uuid)
|
||||
|
||||
def add_file(self, file: File):
|
||||
@@ -482,6 +638,11 @@ class Folder(FileSystemItemABC):
|
||||
raise Exception(f"Invalid file: {file}")
|
||||
|
||||
if self.files.get(file.uuid):
|
||||
# add to deleted list
|
||||
file.delete()
|
||||
self.deleted_files[file.uuid] = file
|
||||
|
||||
# remove from normal file list
|
||||
self.files.pop(file.uuid)
|
||||
self._files_by_name.pop(file.name)
|
||||
else:
|
||||
@@ -503,35 +664,16 @@ class Folder(FileSystemItemABC):
|
||||
"""Returns true if the folder is being quarantined."""
|
||||
return self.status == FileSystemItemStatus.QUARANTINED
|
||||
|
||||
def repair(self) -> None:
|
||||
"""Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
|
||||
super().repair()
|
||||
def scan(self) -> None:
|
||||
"""Update Folder visible status."""
|
||||
super().scan()
|
||||
|
||||
# iterate through the files in the folder
|
||||
# update the status of files in folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.repair()
|
||||
file.scan()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.CORRUPTED:
|
||||
self.status = FileSystemItemStatus.GOOD
|
||||
|
||||
self.fs.sys_log.info(f"Repaired folder {self.name}")
|
||||
|
||||
def corrupt(self) -> None:
|
||||
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPTED."""
|
||||
super().corrupt()
|
||||
|
||||
# iterate through the files in the folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.corrupt()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.GOOD:
|
||||
self.status = FileSystemItemStatus.CORRUPTED
|
||||
|
||||
self.fs.sys_log.info(f"Corrupted folder {self.name}")
|
||||
self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
"""
|
||||
@@ -544,6 +686,8 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
Return False if corruption is detected, otherwise True
|
||||
"""
|
||||
super().check_hash()
|
||||
|
||||
# iterate through the files and run a check hash
|
||||
no_corrupted_files = True
|
||||
|
||||
@@ -555,8 +699,57 @@ class Folder(FileSystemItemABC):
|
||||
if not no_corrupted_files:
|
||||
self.corrupt()
|
||||
|
||||
self.fs.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})")
|
||||
|
||||
return no_corrupted_files
|
||||
|
||||
def repair(self) -> bool:
|
||||
"""Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
|
||||
super().repair()
|
||||
|
||||
repaired = False
|
||||
|
||||
# iterate through the files in the folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
repaired = file.repair()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.CORRUPTED:
|
||||
self.status = FileSystemItemStatus.GOOD
|
||||
repaired = True
|
||||
|
||||
self.fs.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})")
|
||||
return repaired
|
||||
|
||||
def restore(self) -> None:
|
||||
"""TODO."""
|
||||
pass
|
||||
|
||||
def delete(self) -> bool:
|
||||
"""TODO."""
|
||||
super().delete()
|
||||
self.fs.sys_log.info(f"Deleted folder {self.name} (id: {self.uuid})")
|
||||
|
||||
def corrupt(self) -> bool:
|
||||
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPTED."""
|
||||
super().corrupt()
|
||||
|
||||
corrupted = False
|
||||
|
||||
# iterate through the files in the folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
corrupted = file.corrupt()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.GOOD:
|
||||
self.status = FileSystemItemStatus.CORRUPTED
|
||||
corrupted = True
|
||||
|
||||
self.fs.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})")
|
||||
return corrupted
|
||||
|
||||
|
||||
class File(FileSystemItemABC):
|
||||
"""
|
||||
@@ -648,27 +841,12 @@ class File(FileSystemItemABC):
|
||||
state["file_type"] = self.file_type.name
|
||||
return state
|
||||
|
||||
def repair(self) -> None:
|
||||
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
|
||||
super().repair()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.CORRUPTED:
|
||||
self.status = FileSystemItemStatus.GOOD
|
||||
def scan(self) -> None:
|
||||
"""TODO."""
|
||||
super().scan()
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
|
||||
|
||||
def corrupt(self) -> None:
|
||||
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPTED."""
|
||||
super().corrupt()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.GOOD:
|
||||
self.status = FileSystemItemStatus.CORRUPTED
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
|
||||
self.folder.fs.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
"""
|
||||
@@ -702,3 +880,48 @@ class File(FileSystemItemABC):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def delete(self) -> None:
|
||||
"""TODO."""
|
||||
super().delete()
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.folder.fs.sys_log.info(f"Deleting file {self.sim_path if self.sim_path else path}")
|
||||
|
||||
def repair(self) -> bool:
|
||||
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
|
||||
super().repair()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.CORRUPTED:
|
||||
self.status = FileSystemItemStatus.GOOD
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
|
||||
return True
|
||||
|
||||
def restore(self) -> None:
|
||||
"""Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
|
||||
super().restore()
|
||||
|
||||
# set file status to good if deleted or corrupt
|
||||
if self.status in [FileSystemItemStatus.CORRUPTED, FileSystemItemStatus.DELETED]:
|
||||
self.status = FileSystemItemStatus.GOOD
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
|
||||
|
||||
def corrupt(self) -> bool:
|
||||
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPTED."""
|
||||
super().corrupt()
|
||||
|
||||
corrupted = False
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.status == FileSystemItemStatus.GOOD:
|
||||
self.status = FileSystemItemStatus.CORRUPTED
|
||||
corrupted = True
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
|
||||
return corrupted
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from ipaddress import IPv4Address
|
||||
|
||||
from primaite.simulator.file_system.file_system import FileSystemItemStatus
|
||||
from primaite.simulator.network.hardware.nodes.server import Server
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient
|
||||
from primaite.simulator.system.services.database.database_service import DatabaseService
|
||||
|
||||
@@ -44,6 +44,7 @@ def test_delete_file(file_system):
|
||||
file_system.delete_file(folder_name="root", file_name="test_file.txt")
|
||||
assert len(file_system.folders) == 1
|
||||
assert len(file_system.get_folder("root").files) == 0
|
||||
assert len(file_system.get_folder("root").deleted_files) == 1
|
||||
|
||||
|
||||
def test_delete_non_existent_file(file_system):
|
||||
@@ -69,6 +70,7 @@ def test_delete_folder(file_system):
|
||||
|
||||
file_system.delete_folder(folder_name="test_folder")
|
||||
assert len(file_system.folders) == 1
|
||||
assert len(file_system.deleted_folders) == 1
|
||||
|
||||
|
||||
def test_deleting_a_non_existent_folder(file_system):
|
||||
@@ -95,11 +97,13 @@ def test_move_file(file_system):
|
||||
original_uuid = file.uuid
|
||||
|
||||
assert len(file_system.get_folder("src_folder").files) == 1
|
||||
assert len(file_system.get_folder("src_folder").deleted_files) == 0
|
||||
assert len(file_system.get_folder("dst_folder").files) == 0
|
||||
|
||||
file_system.move_file(src_folder_name="src_folder", src_file_name="test_file.txt", dst_folder_name="dst_folder")
|
||||
|
||||
assert len(file_system.get_folder("src_folder").files) == 0
|
||||
assert len(file_system.get_folder("src_folder").deleted_files) == 1
|
||||
assert len(file_system.get_folder("dst_folder").files) == 1
|
||||
assert file_system.get_file("dst_folder", "test_file.txt").uuid == original_uuid
|
||||
|
||||
@@ -169,6 +173,73 @@ def test_folder_corrupt_repair(file_system):
|
||||
assert file.status == FileSystemItemStatus.GOOD
|
||||
|
||||
|
||||
def test_file_scan(file_system):
|
||||
"""Test the ability to update visible status."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert file.status == FileSystemItemStatus.GOOD
|
||||
assert file.visible_status == FileSystemItemStatus.GOOD
|
||||
|
||||
file.corrupt()
|
||||
|
||||
assert file.status == FileSystemItemStatus.CORRUPTED
|
||||
assert file.visible_status == FileSystemItemStatus.GOOD
|
||||
|
||||
file.scan()
|
||||
|
||||
assert file.status == FileSystemItemStatus.CORRUPTED
|
||||
assert file.visible_status == FileSystemItemStatus.CORRUPTED
|
||||
|
||||
|
||||
def test_folder_scan(file_system):
|
||||
"""Test the ability to update visible status."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert folder.status == FileSystemItemStatus.GOOD
|
||||
assert folder.visible_status == FileSystemItemStatus.GOOD
|
||||
assert file.status == FileSystemItemStatus.GOOD
|
||||
assert file.visible_status == FileSystemItemStatus.GOOD
|
||||
|
||||
folder.corrupt()
|
||||
|
||||
assert folder.status == FileSystemItemStatus.CORRUPTED
|
||||
assert folder.visible_status == FileSystemItemStatus.GOOD
|
||||
assert file.status == FileSystemItemStatus.CORRUPTED
|
||||
assert file.visible_status == FileSystemItemStatus.GOOD
|
||||
|
||||
folder.scan()
|
||||
|
||||
assert folder.status == FileSystemItemStatus.CORRUPTED
|
||||
assert folder.visible_status == FileSystemItemStatus.CORRUPTED
|
||||
assert file.status == FileSystemItemStatus.CORRUPTED
|
||||
assert file.visible_status == FileSystemItemStatus.CORRUPTED
|
||||
|
||||
|
||||
def test_file_delete_restore(file_system):
|
||||
"""Test the ability to delete and restore a file."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert file.status == FileSystemItemStatus.GOOD
|
||||
assert file.visible_status == FileSystemItemStatus.GOOD
|
||||
|
||||
file_system.delete_file(folder_name=folder.name, file_name=file.name)
|
||||
|
||||
assert folder.get_file(file_name=file.name) is None
|
||||
assert folder.get_file_by_id(file_uuid=file.uuid, show_deleted=True).status == FileSystemItemStatus.DELETED
|
||||
|
||||
file_system.restore_file(folder_id=folder.uuid, file_id=file.uuid)
|
||||
|
||||
assert file.status == FileSystemItemStatus.GOOD
|
||||
assert folder.get_file(file_name=file.name) is not None
|
||||
|
||||
|
||||
def test_folder_delete_restore(file_system):
|
||||
pass
|
||||
|
||||
|
||||
def test_simulated_file_check_hash(file_system):
|
||||
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user