diff --git a/src/primaite/simulator/file_system/file.py b/src/primaite/simulator/file_system/file.py index 0da3c9ab..04502c0f 100644 --- a/src/primaite/simulator/file_system/file.py +++ b/src/primaite/simulator/file_system/file.py @@ -102,15 +102,22 @@ class File(FileSystemItemABC): def scan(self) -> None: """Updates the visible statuses of the file.""" + if self.deleted: + self.sys_log.error(f"Unable to scan deleted file {self.folder_name}/{self.name}") + return + path = self.folder.name + "/" + self.name self.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}") self.visible_health_status = self.health_status - def reveal_to_red(self): + def reveal_to_red(self) -> None: """Reveals the folder/file to the red agent.""" + if self.deleted: + self.sys_log.error(f"Unable to reveal deleted file {self.folder_name}/{self.name}") + return self.revealed_to_red = True - def check_hash(self) -> bool: + def check_hash(self) -> None: """ Check if the file has been changed. @@ -118,6 +125,9 @@ class File(FileSystemItemABC): Return False if corruption is detected, otherwise True """ + if self.deleted: + self.sys_log.error(f"Unable to check hash of deleted file {self.folder_name}/{self.name}") + return current_hash = None # if file is real, read the file contents @@ -139,13 +149,12 @@ class File(FileSystemItemABC): # if the previous hash and current hash do not match, mark file as corrupted if self.previous_hash is not current_hash: self.corrupt() - return False - return True - - def repair(self) -> bool: + def repair(self) -> None: """Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" - super().repair() + if self.deleted: + self.sys_log.error(f"Unable to repair deleted file {self.folder_name}/{self.name}") + return # set file status to good if corrupt if self.health_status == FileSystemItemHealthStatus.CORRUPT: @@ -153,33 +162,38 @@ class File(FileSystemItemABC): path = self.folder.name + "/" + self.name self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}") - return True - def restore(self) -> bool: + def restore(self) -> None: """Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD.""" - super().restore() - - restored = False - if self.health_status == FileSystemItemHealthStatus.CORRUPT: self.health_status = FileSystemItemHealthStatus.GOOD - restored = True path = self.folder.name + "/" + self.name self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}") - return restored - def corrupt(self) -> bool: + def corrupt(self) -> None: """Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT.""" - super().corrupt() - - corrupted = False + if self.deleted: + self.sys_log.error(f"Unable to corrupt deleted file {self.folder_name}/{self.name}") + return # set file status to good if corrupt if self.health_status == FileSystemItemHealthStatus.GOOD: self.health_status = FileSystemItemHealthStatus.CORRUPT - corrupted = True path = self.folder.name + "/" + self.name self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") - return corrupted + + def restore(self) -> bool: + """Determines if the file needs to be repaired or unmarked as deleted.""" + super().restore() + return True + + def delete(self): + """Marks the file as deleted.""" + if self.deleted: + self.sys_log.error(f"Unable to delete an already deleted file {self.folder_name}/{self.name}") + return + + self.deleted = True + self.sys_log.info(f"File deleted {self.folder_name}/{self.name}") diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index 16c9992c..7f2d41e5 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -38,9 +38,21 @@ class FileSystem(SimComponent): def _init_request_manager(self) -> RequestManager: rm = super()._init_request_manager() + self._delete_manager = RequestManager() + self._delete_manager.add_request( + name="file", + request_type=RequestType( + func=lambda request, context: self.delete_file_by_id(folder_uuid=request[0], file_uuid=request[1]) + ), + ) + self._delete_manager.add_request( + name="folder", + request_type=RequestType(func=lambda request, context: self.delete_folder_by_id(folder_uuid=request[0])), + ) + rm.add_request( name="delete", - request_type=RequestType(func=lambda request, context: self.delete_folder_by_id(folder_uuid=request[0])), + request_type=RequestType(func=self._delete_manager), ) self._folder_request_manager = RequestManager() @@ -119,15 +131,18 @@ class FileSystem(SimComponent): return folder = self._folders_by_name.get(folder_name) if folder: + # set folder to deleted state + folder.delete() + # remove from folder list self.folders.pop(folder.uuid) self._folders_by_name.pop(folder.name) - self.sys_log.info(f"Deleted folder /{folder.name} and its contents") # add to deleted list folder.remove_all_files() self.deleted_folders[folder.uuid] = folder + self.sys_log.info(f"Deleted folder /{folder.name} and its contents") else: _LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}") @@ -216,7 +231,41 @@ class FileSystem(SimComponent): folder = self.get_folder(folder_name) if folder: return folder.get_file(file_name) - self.sys_log.info(f"file not found /{folder_name}/{file_name}") + self.sys_log.info(f"File not found /{folder_name}/{file_name}") + + def get_file_by_id( + self, file_uuid: str, folder_uuid: Optional[str] = None, include_deleted: Optional[bool] = False + ) -> Optional[File]: + """ + Retrieve a file by its uuid from a specific folder. + + :param: file_uuid: The uuid of the folder where the file resides. + :param: folder_uuid: The uuid of the file to be retrieved, including its extension. + :param: include_deleted: If true, the deleted files will also be checked + :return: An instance of File if it exists, otherwise `None`. + """ + folder = self.folders.get(folder_uuid) + + if folder: + return folder.get_file_by_id(file_uuid=file_uuid, include_deleted=include_deleted) + + # iterate through every folder looking for file + file = None + + for folder_id in self.folders: + folder = self.folders.get(folder_id) + res = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True) + if res: + file = res + + if include_deleted: + for folder_id in self.deleted_folders: + folder = self.deleted_folders.get(folder_id) + res = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True) + if res: + file = res + + return file def delete_file(self, folder_name: str, file_name: str): """ @@ -231,6 +280,19 @@ class FileSystem(SimComponent): if file: folder.remove_file(file) + def delete_file_by_id(self, folder_uuid: str, file_uuid: str): + """ + Deletes a file via its uuid. + + :param: folder_uuid: UUID of the folder the file belongs to + :param: file_uuid: UUID of the file to delete + """ + folder = self.get_folder_by_id(folder_uuid=folder_uuid) + + if folder: + file = folder.get_file_by_id(file_uuid=file_uuid) + self.delete_file(folder_name=folder.name, file_name=file.name) + def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str): """ Move a file from one folder to another. @@ -277,7 +339,7 @@ class FileSystem(SimComponent): folder_name=dst_folder.name, **file.model_dump(exclude={"uuid", "folder_id", "folder_name", "sim_path"}), ) - dst_folder.add_file(file_copy) + dst_folder.add_file(file_copy, force=True) if file.real: file_copy.sim_path.parent.mkdir(exist_ok=True) @@ -303,6 +365,10 @@ class FileSystem(SimComponent): for folder_id in self.folders: self.folders[folder_id].apply_timestep(timestep=timestep) + ############################################################### + # Agent actions + ############################################################### + def scan(self, instant_scan: bool = False): """ Scan all the folders (and child files) in the file system. diff --git a/src/primaite/simulator/file_system/file_system_item_abc.py b/src/primaite/simulator/file_system/file_system_item_abc.py index c0d65139..543bb1b7 100644 --- a/src/primaite/simulator/file_system/file_system_item_abc.py +++ b/src/primaite/simulator/file_system/file_system_item_abc.py @@ -82,6 +82,9 @@ class FileSystemItemABC(SimComponent): sys_log: SysLog "Used for creating system logs." + deleted: bool = False + "If true, the FileSystemItem was deleted." + def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. @@ -121,7 +124,17 @@ class FileSystemItemABC(SimComponent): return convert_size(self.size) @abstractmethod - def check_hash(self) -> bool: + def scan(self) -> None: + """Scan the folder/file - updates the visible_health_status.""" + pass + + @abstractmethod + def reveal_to_red(self) -> None: + """Reveal the folder/file to the red agent.""" + pass + + @abstractmethod + def check_hash(self) -> None: """ Checks the has of the file to detect any changes. @@ -132,7 +145,7 @@ class FileSystemItemABC(SimComponent): pass @abstractmethod - def repair(self) -> bool: + def repair(self) -> None: """ Repair the FileSystemItem. @@ -141,7 +154,7 @@ class FileSystemItemABC(SimComponent): pass @abstractmethod - def corrupt(self) -> bool: + def corrupt(self) -> None: """ Corrupt the FileSystemItem. @@ -150,6 +163,11 @@ class FileSystemItemABC(SimComponent): pass @abstractmethod - def restore(self) -> bool: + def restore(self) -> None: """Restore the file/folder to the state before it got ruined.""" pass + + @abstractmethod + def delete(self) -> None: + """Mark the file/folder as deleted.""" + self.deleted = True diff --git a/src/primaite/simulator/file_system/folder.py b/src/primaite/simulator/file_system/folder.py index 6922cad1..e12d4e12 100644 --- a/src/primaite/simulator/file_system/folder.py +++ b/src/primaite/simulator/file_system/folder.py @@ -131,34 +131,45 @@ class Folder(FileSystemItemABC): # TODO: Increment read count? return self._files_by_name.get(file_name) - def get_file_by_id(self, file_uuid: str) -> File: + def get_file_by_id(self, file_uuid: str, include_deleted: Optional[bool] = False) -> File: """ Get a file by its uuid. :param: file_uuid: The file uuid. + :param: include_deleted: If true, the deleted files will also be checked :return: The matching File. """ + if include_deleted: + deleted_file = self.deleted_files.get(file_uuid) + + if deleted_file: + return deleted_file + return self.files.get(file_uuid) - def add_file(self, file: File): + def add_file(self, file: File, force: Optional[bool] = False): """ Adds a file to the folder. - :param File file: The File object to be added to the folder. + :param: file: The File object to be added to the folder. + :param: force: Overwrite file - do not check if uuid or name already exists in folder. Default False. :raises Exception: If the provided `file` parameter is None or not an instance of the `File` class. """ if file is None or not isinstance(file, File): raise Exception(f"Invalid file: {file}") - # check if file with id already exists in folder - if file.uuid in self.files: - _LOGGER.debug(f"File with id {file.uuid} already exists in folder") - else: - # add to list - self.files[file.uuid] = file - self._files_by_name[file.name] = file - file.folder = self + # check if file with id or name already exists in folder + if (force is not True) and file.name in self._files_by_name: + raise Exception(f"File with name {file.name} already exists in folder") + + if (force is not True) and file.uuid in self.files: + raise Exception(f"File with uuid {file.uuid} already exists in folder") + + # add to list + self.files[file.uuid] = file + self._files_by_name[file.name] = file + file.folder = self def remove_file(self, file: Optional[File]): """ @@ -175,6 +186,7 @@ class Folder(FileSystemItemABC): self.files.pop(file.uuid) self._files_by_name.pop(file.name) self.deleted_files[file.uuid] = file + file.delete() self.sys_log.info(f"Removed file {file.name} (id: {file.uuid})") else: _LOGGER.debug(f"File with UUID {file.uuid} was not found.") @@ -191,7 +203,9 @@ class Folder(FileSystemItemABC): def remove_all_files(self): """Removes all the files in the folder.""" for file_id in self.files: - self.deleted_files[file_id] = self.files[file_id] + file = self.files.get(file_id) + file.delete() + self.deleted_files[file_id] = file self.files = {} self._files_by_name = {} @@ -224,6 +238,10 @@ class Folder(FileSystemItemABC): :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. """ + if self.deleted: + self.sys_log.error(f"Unable to scan deleted folder {self.name}") + return + if instant_scan: for file_id in self.files: file = self.get_file_by_id(file_uuid=file_id) @@ -246,6 +264,10 @@ class Folder(FileSystemItemABC): :param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False. """ + if self.deleted: + self.sys_log.error(f"Unable to reveal deleted folder {self.name}") + return + if instant_scan: self.revealed_to_red = True for file_id in self.files: @@ -261,7 +283,7 @@ class Folder(FileSystemItemABC): # scan already in progress self.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})") - def check_hash(self) -> bool: + def check_hash(self) -> None: """ Runs a :func:`check_hash` on all files in the folder. @@ -272,14 +294,18 @@ class Folder(FileSystemItemABC): Return False if corruption is detected, otherwise True """ - super().check_hash() + if self.deleted: + self.sys_log.error(f"Unable to check hash of deleted folder {self.name}") + return # iterate through the files and run a check hash no_corrupted_files = True for file_id in self.files: file = self.get_file_by_id(file_uuid=file_id) - no_corrupted_files = file.check_hash() + file.check_hash() + if file.health_status == FileSystemItemHealthStatus.CORRUPT: + no_corrupted_files = False # if one file in the folder is corrupted, set the folder status to corrupted if not no_corrupted_files: @@ -287,61 +313,53 @@ class Folder(FileSystemItemABC): self.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})") - return no_corrupted_files - - def repair(self) -> bool: + def repair(self) -> None: """Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" - super().repair() - - repaired = False + if self.deleted: + self.sys_log.error(f"Unable to repair deleted folder {self.name}") + return # iterate through the files in the folder for file_id in self.files: file = self.get_file_by_id(file_uuid=file_id) - repaired = file.repair() + file.repair() # set file status to good if corrupt if self.health_status == FileSystemItemHealthStatus.CORRUPT: self.health_status = FileSystemItemHealthStatus.GOOD - repaired = True + + self.health_status = FileSystemItemHealthStatus.GOOD self.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})") - return repaired - def restore(self) -> bool: - """Restore a File by setting the folder and containing files status to FileSystemItemStatus.GOOD.""" - super().restore() + def restore(self) -> None: + """ + If a Folder is corrupted, run a repair on the folder and its child files. - restored = False + If the folder is deleted, restore the folder by setting deleted status to False. + """ + pass - # iterate through the files in the folder - for file_id in self.files: - file = self.get_file_by_id(file_uuid=file_id) - restored = file.restore() - - # set file status to corrupt if good - if self.health_status == FileSystemItemHealthStatus.CORRUPT: - self.health_status = FileSystemItemHealthStatus.GOOD - restored = True - - self.sys_log.info(f"Restored folder {self.name} (id: {self.uuid})") - return restored - - def corrupt(self) -> bool: + def corrupt(self) -> None: """Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT.""" - super().corrupt() - - corrupted = False + if self.deleted: + self.sys_log.error(f"Unable to corrupt deleted folder {self.name}") + return # iterate through the files in the folder for file_id in self.files: file = self.get_file_by_id(file_uuid=file_id) - corrupted = file.corrupt() + file.corrupt() - # set file status to corrupt if good - if self.health_status == FileSystemItemHealthStatus.GOOD: - self.health_status = FileSystemItemHealthStatus.CORRUPT - corrupted = True + # set file status to corrupt + self.health_status = FileSystemItemHealthStatus.CORRUPT self.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})") - return corrupted + + def delete(self): + """Marks the file as deleted. Prevents agent actions from occuring.""" + if self.deleted: + self.sys_log.error(f"Unable to delete an already deleted folder {self.name}") + return + + self.deleted = True diff --git a/src/primaite/simulator/system/services/database/database_service.py b/src/primaite/simulator/system/services/database/database_service.py index f7333f97..b04174bf 100644 --- a/src/primaite/simulator/system/services/database/database_service.py +++ b/src/primaite/simulator/system/services/database/database_service.py @@ -129,7 +129,7 @@ class DatabaseService(Service): self._conn.close() # replace db file self.file_system.delete_file(folder_name=self.folder.name, file_name="downloads.db") - self.file_system.move_file( + self.file_system.copy_file( src_folder_name="downloads", src_file_name="database.db", dst_folder_name=self.folder.name ) self._db_file = self.file_system.get_file(folder_name=self.folder.name, file_name="database.db") diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py index 174c7726..94ccde83 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file.py @@ -44,24 +44,24 @@ def test_file_reveal_to_red_scan(file_system): def test_simulated_file_check_hash(file_system): file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - assert file.check_hash() is True - + file.check_hash() + assert file.health_status == FileSystemItemHealthStatus.GOOD # change simulated file size file.sim_size = 0 - assert file.check_hash() is False + file.check_hash() assert file.health_status == FileSystemItemHealthStatus.CORRUPT def test_real_file_check_hash(file_system): file: File = file_system.create_file(file_name="test_file.txt", real=True) - assert file.check_hash() is True - + file.check_hash() + assert file.health_status == FileSystemItemHealthStatus.GOOD # change file content with open(file.sim_path, "a") as f: f.write("get hacked scrub lol xD\n") - assert file.check_hash() is False + file.check_hash() assert file.health_status == FileSystemItemHealthStatus.CORRUPT diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_actions.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_actions.py new file mode 100644 index 00000000..f43c6b22 --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_actions.py @@ -0,0 +1,89 @@ +from typing import Tuple + +import pytest + +from primaite.simulator.file_system.file_system import File, FileSystem, Folder +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus + + +@pytest.fixture(scope="function") +def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]: + """Create a file system with a folder and file.""" + folder = file_system.create_folder(folder_name="test_folder") + file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt") + + return file_system, folder, file + + +def test_file_scan_request(populated_file_system): + """Test that an agent can request a file scan.""" + fs, folder, file = populated_file_system + + file.corrupt() + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.visible_health_status == FileSystemItemHealthStatus.GOOD + + fs.apply_request(request=["file", file.uuid, "scan"]) + + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_file_checkhash_request(populated_file_system): + """Test that an agent can request a file hash check.""" + fs, folder, file = populated_file_system + + fs.apply_request(request=["file", file.uuid, "checkhash"]) + + assert file.health_status == FileSystemItemHealthStatus.GOOD + file.sim_size = 0 + + fs.apply_request(request=["file", file.uuid, "checkhash"]) + + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_file_repair_request(populated_file_system): + """Test that an agent can request a file repair.""" + fs, folder, file = populated_file_system + + file.corrupt() + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + fs.apply_request(request=["file", file.uuid, "repair"]) + assert file.health_status == FileSystemItemHealthStatus.GOOD + + +def test_file_restore_request(populated_file_system): + pass + + +def test_file_corrupt_request(populated_file_system): + """Test that an agent can request a file corruption.""" + fs, folder, file = populated_file_system + fs.apply_request(request=["file", file.uuid, "corrupt"]) + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_deleted_file_cannot_be_interacted_with(populated_file_system): + """Test that actions cannot affect deleted files.""" + fs, folder, file = populated_file_system + assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None + + fs.apply_request(request=["file", file.uuid, "corrupt"]) + assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT + assert ( + fs.get_file(folder_name=folder.name, file_name=file.name).visible_health_status + == FileSystemItemHealthStatus.GOOD + ) + + fs.apply_request(request=["delete", "file", folder.uuid, file.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name) is None + + fs.apply_request(request=["file", file.uuid, "repair"]) + fs.apply_request(request=["file", file.uuid, "scan"]) + + file = folder.deleted_files.get(file.uuid) + + assert file.health_status is not FileSystemItemHealthStatus.GOOD + assert file.visible_health_status is not FileSystemItemHealthStatus.CORRUPT diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py index d26d0a4a..03c6ad12 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py @@ -1,7 +1,9 @@ import pytest +from primaite.simulator.file_system.file import File from primaite.simulator.file_system.file_system import FileSystem from primaite.simulator.file_system.file_type import FileType +from primaite.simulator.file_system.folder import Folder def test_create_folder_and_file(file_system): @@ -65,6 +67,34 @@ def test_delete_folder(file_system): assert len(file_system.deleted_folders) == 1 +def test_create_duplicate_folder(file_system): + """Test that creating a duplicate folder throws exception.""" + assert len(file_system.folders) == 1 + file_system.create_folder(folder_name="test_folder") + + assert len(file_system.folders) is 2 + with pytest.raises(Exception): + file_system.create_folder(folder_name="test_folder") + + assert len(file_system.folders) is 2 + + +def test_create_duplicate_file(file_system): + """Test that creating a duplicate file throws exception.""" + assert len(file_system.folders) == 1 + file_system.create_folder(folder_name="test_folder") + + assert len(file_system.folders) is 2 + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert len(file_system.get_folder("test_folder").files) == 1 + + with pytest.raises(Exception): + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert len(file_system.get_folder("test_folder").files) == 1 + + def test_deleting_a_non_existent_folder(file_system): file_system.create_folder(folder_name="test_folder") assert len(file_system.folders) == 2 @@ -116,6 +146,23 @@ def test_copy_file(file_system): assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid +def test_get_file(file_system): + """Test that files can be retrieved.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file1: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + file2: File = file_system.create_file(file_name="test_file2.txt", folder_name="test_folder") + + folder.remove_file(file2) + + assert file_system.get_file_by_id(file_uuid=file1.uuid, folder_uuid=folder.uuid) is not None + assert file_system.get_file_by_id(file_uuid=file2.uuid, folder_uuid=folder.uuid) is None + assert file_system.get_file_by_id(file_uuid=file2.uuid, folder_uuid=folder.uuid, include_deleted=True) is not None + assert file_system.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None + + file_system.delete_folder(folder_name="test_folder") + assert file_system.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None + + @pytest.mark.skip(reason="Skipping until we tackle serialisation") def test_serialisation(file_system): """Test to check that the object serialisation works correctly.""" diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py index d902c935..d6bbd285 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_actions.py @@ -8,139 +8,20 @@ from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHe @pytest.fixture(scope="function") def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]: - """Test that an agent can request a file scan.""" + """Create a file system with a folder and file.""" folder = file_system.create_folder(folder_name="test_folder") file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt") return file_system, folder, file -def test_file_scan_request(populated_file_system): - """Test that an agent can request a file scan.""" - fs, folder, file = populated_file_system - - file.corrupt() - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - assert file.visible_health_status == FileSystemItemHealthStatus.GOOD - - fs.apply_request(request=["file", file.uuid, "scan"]) - - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_folder_scan_request(populated_file_system): - """Test that an agent can request a folder scan.""" - fs, folder, file = populated_file_system - fs.create_file(file_name="test_file2.txt", folder_name="test_folder") - - file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1]) - file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0]) - - folder.corrupt() - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD - - fs.apply_request(request=["folder", folder.uuid, "scan"]) - - folder.apply_timestep(timestep=0) - - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD - assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD - - folder.apply_timestep(timestep=1) - - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT - assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT - assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_file_checkhash_request(populated_file_system): - """Test that an agent can request a file hash check.""" - fs, folder, file = populated_file_system - - fs.apply_request(request=["file", file.uuid, "checkhash"]) - - assert file.health_status == FileSystemItemHealthStatus.GOOD - file.sim_size = 0 - - fs.apply_request(request=["file", file.uuid, "checkhash"]) - - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_folder_checkhash_request(populated_file_system): - """Test that an agent can request a folder hash check.""" - fs, folder, file = populated_file_system - - fs.apply_request(request=["folder", folder.uuid, "checkhash"]) - - assert folder.health_status == FileSystemItemHealthStatus.GOOD - file.sim_size = 0 - - fs.apply_request(request=["folder", folder.uuid, "checkhash"]) - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_file_repair_request(populated_file_system): - """Test that an agent can request a file repair.""" - fs, folder, file = populated_file_system - - file.corrupt() - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - fs.apply_request(request=["file", file.uuid, "repair"]) - assert file.health_status == FileSystemItemHealthStatus.GOOD - - -def test_folder_repair_request(populated_file_system): - """Test that an agent can request a folder repair.""" - fs, folder, file = populated_file_system - - folder.corrupt() - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - - fs.apply_request(request=["folder", folder.uuid, "repair"]) - assert file.health_status == FileSystemItemHealthStatus.GOOD - assert folder.health_status == FileSystemItemHealthStatus.GOOD - - -def test_file_restore_request(populated_file_system): - pass - - -def test_folder_restore_request(populated_file_system): - pass - - -def test_file_corrupt_request(populated_file_system): - """Test that an agent can request a file corruption.""" - fs, folder, file = populated_file_system - fs.apply_request(request=["file", file.uuid, "corrupt"]) - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - - -def test_folder_corrupt_request(populated_file_system): - """Test that an agent can request a folder corruption.""" - fs, folder, file = populated_file_system - fs.apply_request(request=["folder", folder.uuid, "corrupt"]) - assert file.health_status == FileSystemItemHealthStatus.CORRUPT - assert folder.health_status == FileSystemItemHealthStatus.CORRUPT - - def test_file_delete_request(populated_file_system): """Test that an agent can request a file deletion.""" fs, folder, file = populated_file_system - assert folder.get_file_by_id(file_uuid=file.uuid) is not None + assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None - fs.apply_request(request=["folder", folder.uuid, "delete", file.uuid]) - assert folder.get_file_by_id(file_uuid=file.uuid) is None + fs.apply_request(request=["delete", "file", folder.uuid, file.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name) is None def test_folder_delete_request(populated_file_system): @@ -149,6 +30,6 @@ def test_folder_delete_request(populated_file_system): assert folder.get_file_by_id(file_uuid=file.uuid) is not None assert fs.get_folder_by_id(folder_uuid=folder.uuid) is not None - fs.apply_request(request=["delete", folder.uuid]) + fs.apply_request(request=["delete", "folder", folder.uuid]) assert fs.get_folder_by_id(folder_uuid=folder.uuid) is None - assert folder.get_file_by_id(file_uuid=file.uuid) is None + assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is None diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py index 9a12dbe9..56f2f6fb 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_folder.py @@ -19,6 +19,20 @@ def test_folder_quarantine_state(file_system): assert folder.quarantine_status() is False +def test_folder_get_file(file_system): + """Test that files can be retrieved from the folder.""" + folder: Folder = file_system.create_folder(folder_name="test_folder") + file1: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + file2: File = file_system.create_file(file_name="test_file2.txt", folder_name="test_folder") + + folder.remove_file(file2) + + assert folder.get_file_by_id(file_uuid=file1.uuid) is not None + assert folder.get_file_by_id(file_uuid=file2.uuid) is None + + assert folder.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None + + def test_folder_scan(file_system): """Test the ability to update visible status.""" folder: Folder = file_system.create_folder(folder_name="test_folder") @@ -107,12 +121,13 @@ def test_simulated_folder_check_hash(file_system): folder: Folder = file_system.create_folder(folder_name="test_folder") file_system.create_file(file_name="test_file.txt", folder_name="test_folder") - assert folder.check_hash() is True + folder.check_hash() + assert folder.health_status == FileSystemItemHealthStatus.GOOD # change simulated file size file = folder.get_file(file_name="test_file.txt") file.sim_size = 0 - assert folder.check_hash() is False + folder.check_hash() assert folder.health_status == FileSystemItemHealthStatus.CORRUPT @@ -120,8 +135,8 @@ def test_real_folder_check_hash(file_system): folder: Folder = file_system.create_folder(folder_name="test_folder") file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True) - assert folder.check_hash() is True - + folder.check_hash() + assert folder.health_status == FileSystemItemHealthStatus.GOOD # change simulated file size file = folder.get_file(file_name="test_file.txt") @@ -129,5 +144,5 @@ def test_real_folder_check_hash(file_system): with open(file.sim_path, "a") as f: f.write("get hacked scrub lol xD\n") - assert folder.check_hash() is False + folder.check_hash() assert folder.health_status == FileSystemItemHealthStatus.CORRUPT diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_folder_actions.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_folder_actions.py new file mode 100644 index 00000000..05259320 --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_folder_actions.py @@ -0,0 +1,103 @@ +from typing import Tuple + +import pytest + +from primaite.simulator.file_system.file_system import File, FileSystem, Folder +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus + + +@pytest.fixture(scope="function") +def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]: + """Create a file system with a folder and file.""" + folder = file_system.create_folder(folder_name="test_folder") + file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt") + + return file_system, folder, file + + +def test_folder_scan_request(populated_file_system): + """Test that an agent can request a folder scan.""" + fs, folder, file = populated_file_system + fs.create_file(file_name="test_file2.txt", folder_name="test_folder") + + file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1]) + file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0]) + + folder.corrupt() + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD + + fs.apply_request(request=["folder", folder.uuid, "scan"]) + + folder.apply_timestep(timestep=0) + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD + assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD + + folder.apply_timestep(timestep=1) + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT + assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT + assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_folder_checkhash_request(populated_file_system): + """Test that an agent can request a folder hash check.""" + fs, folder, file = populated_file_system + + fs.apply_request(request=["folder", folder.uuid, "checkhash"]) + + assert folder.health_status == FileSystemItemHealthStatus.GOOD + file.sim_size = 0 + + fs.apply_request(request=["folder", folder.uuid, "checkhash"]) + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_folder_repair_request(populated_file_system): + """Test that an agent can request a folder repair.""" + fs, folder, file = populated_file_system + + folder.corrupt() + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + + fs.apply_request(request=["folder", folder.uuid, "repair"]) + assert file.health_status == FileSystemItemHealthStatus.GOOD + assert folder.health_status == FileSystemItemHealthStatus.GOOD + + +def test_folder_restore_request(populated_file_system): + pass + + +def test_folder_corrupt_request(populated_file_system): + """Test that an agent can request a folder corruption.""" + fs, folder, file = populated_file_system + fs.apply_request(request=["folder", folder.uuid, "corrupt"]) + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_deleted_folder_and_its_files_cannot_be_interacted_with(populated_file_system): + """Test that actions cannot affect deleted folder and its child files.""" + fs, folder, file = populated_file_system + assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None + + fs.apply_request(request=["file", file.uuid, "corrupt"]) + assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT + + fs.apply_request(request=["delete", "folder", folder.uuid]) + assert fs.get_file(folder_name=folder.name, file_name=file.name) is None + + fs.apply_request(request=["file", file.uuid, "repair"]) + + deleted_folder = fs.deleted_folders.get(folder.uuid) + deleted_file = deleted_folder.deleted_files.get(file.uuid) + + assert deleted_file.health_status is not FileSystemItemHealthStatus.GOOD