#1962: split tests into managable files + implement deletion of folders and files + tests
This commit is contained in:
@@ -102,15 +102,22 @@ class File(FileSystemItemABC):
|
||||
|
||||
def scan(self) -> None:
|
||||
"""Updates the visible statuses of the file."""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to scan deleted file {self.folder_name}/{self.name}")
|
||||
return
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
|
||||
self.visible_health_status = self.health_status
|
||||
|
||||
def reveal_to_red(self):
|
||||
def reveal_to_red(self) -> None:
|
||||
"""Reveals the folder/file to the red agent."""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to reveal deleted file {self.folder_name}/{self.name}")
|
||||
return
|
||||
self.revealed_to_red = True
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
def check_hash(self) -> None:
|
||||
"""
|
||||
Check if the file has been changed.
|
||||
|
||||
@@ -118,6 +125,9 @@ class File(FileSystemItemABC):
|
||||
|
||||
Return False if corruption is detected, otherwise True
|
||||
"""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to check hash of deleted file {self.folder_name}/{self.name}")
|
||||
return
|
||||
current_hash = None
|
||||
|
||||
# if file is real, read the file contents
|
||||
@@ -139,13 +149,12 @@ class File(FileSystemItemABC):
|
||||
# if the previous hash and current hash do not match, mark file as corrupted
|
||||
if self.previous_hash is not current_hash:
|
||||
self.corrupt()
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def repair(self) -> bool:
|
||||
def repair(self) -> None:
|
||||
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
|
||||
super().repair()
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to repair deleted file {self.folder_name}/{self.name}")
|
||||
return
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
@@ -153,33 +162,38 @@ class File(FileSystemItemABC):
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
|
||||
return True
|
||||
|
||||
def restore(self) -> bool:
|
||||
def restore(self) -> None:
|
||||
"""Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
|
||||
super().restore()
|
||||
|
||||
restored = False
|
||||
|
||||
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.health_status = FileSystemItemHealthStatus.GOOD
|
||||
restored = True
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
|
||||
return restored
|
||||
|
||||
def corrupt(self) -> bool:
|
||||
def corrupt(self) -> None:
|
||||
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT."""
|
||||
super().corrupt()
|
||||
|
||||
corrupted = False
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to corrupt deleted file {self.folder_name}/{self.name}")
|
||||
return
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.health_status == FileSystemItemHealthStatus.GOOD:
|
||||
self.health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
corrupted = True
|
||||
|
||||
path = self.folder.name + "/" + self.name
|
||||
self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
|
||||
return corrupted
|
||||
|
||||
def restore(self) -> bool:
|
||||
"""Determines if the file needs to be repaired or unmarked as deleted."""
|
||||
super().restore()
|
||||
return True
|
||||
|
||||
def delete(self):
|
||||
"""Marks the file as deleted."""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to delete an already deleted file {self.folder_name}/{self.name}")
|
||||
return
|
||||
|
||||
self.deleted = True
|
||||
self.sys_log.info(f"File deleted {self.folder_name}/{self.name}")
|
||||
|
||||
@@ -38,9 +38,21 @@ class FileSystem(SimComponent):
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
rm = super()._init_request_manager()
|
||||
|
||||
self._delete_manager = RequestManager()
|
||||
self._delete_manager.add_request(
|
||||
name="file",
|
||||
request_type=RequestType(
|
||||
func=lambda request, context: self.delete_file_by_id(folder_uuid=request[0], file_uuid=request[1])
|
||||
),
|
||||
)
|
||||
self._delete_manager.add_request(
|
||||
name="folder",
|
||||
request_type=RequestType(func=lambda request, context: self.delete_folder_by_id(folder_uuid=request[0])),
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
name="delete",
|
||||
request_type=RequestType(func=lambda request, context: self.delete_folder_by_id(folder_uuid=request[0])),
|
||||
request_type=RequestType(func=self._delete_manager),
|
||||
)
|
||||
|
||||
self._folder_request_manager = RequestManager()
|
||||
@@ -119,15 +131,18 @@ class FileSystem(SimComponent):
|
||||
return
|
||||
folder = self._folders_by_name.get(folder_name)
|
||||
if folder:
|
||||
# set folder to deleted state
|
||||
folder.delete()
|
||||
|
||||
# remove from folder list
|
||||
self.folders.pop(folder.uuid)
|
||||
self._folders_by_name.pop(folder.name)
|
||||
self.sys_log.info(f"Deleted folder /{folder.name} and its contents")
|
||||
|
||||
# add to deleted list
|
||||
folder.remove_all_files()
|
||||
|
||||
self.deleted_folders[folder.uuid] = folder
|
||||
self.sys_log.info(f"Deleted folder /{folder.name} and its contents")
|
||||
else:
|
||||
_LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}")
|
||||
|
||||
@@ -216,7 +231,41 @@ class FileSystem(SimComponent):
|
||||
folder = self.get_folder(folder_name)
|
||||
if folder:
|
||||
return folder.get_file(file_name)
|
||||
self.sys_log.info(f"file not found /{folder_name}/{file_name}")
|
||||
self.sys_log.info(f"File not found /{folder_name}/{file_name}")
|
||||
|
||||
def get_file_by_id(
|
||||
self, file_uuid: str, folder_uuid: Optional[str] = None, include_deleted: Optional[bool] = False
|
||||
) -> Optional[File]:
|
||||
"""
|
||||
Retrieve a file by its uuid from a specific folder.
|
||||
|
||||
:param: file_uuid: The uuid of the folder where the file resides.
|
||||
:param: folder_uuid: The uuid of the file to be retrieved, including its extension.
|
||||
:param: include_deleted: If true, the deleted files will also be checked
|
||||
:return: An instance of File if it exists, otherwise `None`.
|
||||
"""
|
||||
folder = self.folders.get(folder_uuid)
|
||||
|
||||
if folder:
|
||||
return folder.get_file_by_id(file_uuid=file_uuid, include_deleted=include_deleted)
|
||||
|
||||
# iterate through every folder looking for file
|
||||
file = None
|
||||
|
||||
for folder_id in self.folders:
|
||||
folder = self.folders.get(folder_id)
|
||||
res = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
|
||||
if res:
|
||||
file = res
|
||||
|
||||
if include_deleted:
|
||||
for folder_id in self.deleted_folders:
|
||||
folder = self.deleted_folders.get(folder_id)
|
||||
res = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
|
||||
if res:
|
||||
file = res
|
||||
|
||||
return file
|
||||
|
||||
def delete_file(self, folder_name: str, file_name: str):
|
||||
"""
|
||||
@@ -231,6 +280,19 @@ class FileSystem(SimComponent):
|
||||
if file:
|
||||
folder.remove_file(file)
|
||||
|
||||
def delete_file_by_id(self, folder_uuid: str, file_uuid: str):
|
||||
"""
|
||||
Deletes a file via its uuid.
|
||||
|
||||
:param: folder_uuid: UUID of the folder the file belongs to
|
||||
:param: file_uuid: UUID of the file to delete
|
||||
"""
|
||||
folder = self.get_folder_by_id(folder_uuid=folder_uuid)
|
||||
|
||||
if folder:
|
||||
file = folder.get_file_by_id(file_uuid=file_uuid)
|
||||
self.delete_file(folder_name=folder.name, file_name=file.name)
|
||||
|
||||
def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str):
|
||||
"""
|
||||
Move a file from one folder to another.
|
||||
@@ -277,7 +339,7 @@ class FileSystem(SimComponent):
|
||||
folder_name=dst_folder.name,
|
||||
**file.model_dump(exclude={"uuid", "folder_id", "folder_name", "sim_path"}),
|
||||
)
|
||||
dst_folder.add_file(file_copy)
|
||||
dst_folder.add_file(file_copy, force=True)
|
||||
|
||||
if file.real:
|
||||
file_copy.sim_path.parent.mkdir(exist_ok=True)
|
||||
@@ -303,6 +365,10 @@ class FileSystem(SimComponent):
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].apply_timestep(timestep=timestep)
|
||||
|
||||
###############################################################
|
||||
# Agent actions
|
||||
###############################################################
|
||||
|
||||
def scan(self, instant_scan: bool = False):
|
||||
"""
|
||||
Scan all the folders (and child files) in the file system.
|
||||
|
||||
@@ -82,6 +82,9 @@ class FileSystemItemABC(SimComponent):
|
||||
sys_log: SysLog
|
||||
"Used for creating system logs."
|
||||
|
||||
deleted: bool = False
|
||||
"If true, the FileSystemItem was deleted."
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -121,7 +124,17 @@ class FileSystemItemABC(SimComponent):
|
||||
return convert_size(self.size)
|
||||
|
||||
@abstractmethod
|
||||
def check_hash(self) -> bool:
|
||||
def scan(self) -> None:
|
||||
"""Scan the folder/file - updates the visible_health_status."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def reveal_to_red(self) -> None:
|
||||
"""Reveal the folder/file to the red agent."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def check_hash(self) -> None:
|
||||
"""
|
||||
Checks the has of the file to detect any changes.
|
||||
|
||||
@@ -132,7 +145,7 @@ class FileSystemItemABC(SimComponent):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def repair(self) -> bool:
|
||||
def repair(self) -> None:
|
||||
"""
|
||||
Repair the FileSystemItem.
|
||||
|
||||
@@ -141,7 +154,7 @@ class FileSystemItemABC(SimComponent):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def corrupt(self) -> bool:
|
||||
def corrupt(self) -> None:
|
||||
"""
|
||||
Corrupt the FileSystemItem.
|
||||
|
||||
@@ -150,6 +163,11 @@ class FileSystemItemABC(SimComponent):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def restore(self) -> bool:
|
||||
def restore(self) -> None:
|
||||
"""Restore the file/folder to the state before it got ruined."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self) -> None:
|
||||
"""Mark the file/folder as deleted."""
|
||||
self.deleted = True
|
||||
|
||||
@@ -131,34 +131,45 @@ class Folder(FileSystemItemABC):
|
||||
# TODO: Increment read count?
|
||||
return self._files_by_name.get(file_name)
|
||||
|
||||
def get_file_by_id(self, file_uuid: str) -> File:
|
||||
def get_file_by_id(self, file_uuid: str, include_deleted: Optional[bool] = False) -> File:
|
||||
"""
|
||||
Get a file by its uuid.
|
||||
|
||||
:param: file_uuid: The file uuid.
|
||||
:param: include_deleted: If true, the deleted files will also be checked
|
||||
:return: The matching File.
|
||||
"""
|
||||
if include_deleted:
|
||||
deleted_file = self.deleted_files.get(file_uuid)
|
||||
|
||||
if deleted_file:
|
||||
return deleted_file
|
||||
|
||||
return self.files.get(file_uuid)
|
||||
|
||||
def add_file(self, file: File):
|
||||
def add_file(self, file: File, force: Optional[bool] = False):
|
||||
"""
|
||||
Adds a file to the folder.
|
||||
|
||||
:param File file: The File object to be added to the folder.
|
||||
:param: file: The File object to be added to the folder.
|
||||
:param: force: Overwrite file - do not check if uuid or name already exists in folder. Default False.
|
||||
:raises Exception: If the provided `file` parameter is None or not an instance of the
|
||||
`File` class.
|
||||
"""
|
||||
if file is None or not isinstance(file, File):
|
||||
raise Exception(f"Invalid file: {file}")
|
||||
|
||||
# check if file with id already exists in folder
|
||||
if file.uuid in self.files:
|
||||
_LOGGER.debug(f"File with id {file.uuid} already exists in folder")
|
||||
else:
|
||||
# add to list
|
||||
self.files[file.uuid] = file
|
||||
self._files_by_name[file.name] = file
|
||||
file.folder = self
|
||||
# check if file with id or name already exists in folder
|
||||
if (force is not True) and file.name in self._files_by_name:
|
||||
raise Exception(f"File with name {file.name} already exists in folder")
|
||||
|
||||
if (force is not True) and file.uuid in self.files:
|
||||
raise Exception(f"File with uuid {file.uuid} already exists in folder")
|
||||
|
||||
# add to list
|
||||
self.files[file.uuid] = file
|
||||
self._files_by_name[file.name] = file
|
||||
file.folder = self
|
||||
|
||||
def remove_file(self, file: Optional[File]):
|
||||
"""
|
||||
@@ -175,6 +186,7 @@ class Folder(FileSystemItemABC):
|
||||
self.files.pop(file.uuid)
|
||||
self._files_by_name.pop(file.name)
|
||||
self.deleted_files[file.uuid] = file
|
||||
file.delete()
|
||||
self.sys_log.info(f"Removed file {file.name} (id: {file.uuid})")
|
||||
else:
|
||||
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
|
||||
@@ -191,7 +203,9 @@ class Folder(FileSystemItemABC):
|
||||
def remove_all_files(self):
|
||||
"""Removes all the files in the folder."""
|
||||
for file_id in self.files:
|
||||
self.deleted_files[file_id] = self.files[file_id]
|
||||
file = self.files.get(file_id)
|
||||
file.delete()
|
||||
self.deleted_files[file_id] = file
|
||||
|
||||
self.files = {}
|
||||
self._files_by_name = {}
|
||||
@@ -224,6 +238,10 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to scan deleted folder {self.name}")
|
||||
return
|
||||
|
||||
if instant_scan:
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
@@ -246,6 +264,10 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to reveal deleted folder {self.name}")
|
||||
return
|
||||
|
||||
if instant_scan:
|
||||
self.revealed_to_red = True
|
||||
for file_id in self.files:
|
||||
@@ -261,7 +283,7 @@ class Folder(FileSystemItemABC):
|
||||
# scan already in progress
|
||||
self.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})")
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
def check_hash(self) -> None:
|
||||
"""
|
||||
Runs a :func:`check_hash` on all files in the folder.
|
||||
|
||||
@@ -272,14 +294,18 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
Return False if corruption is detected, otherwise True
|
||||
"""
|
||||
super().check_hash()
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to check hash of deleted folder {self.name}")
|
||||
return
|
||||
|
||||
# iterate through the files and run a check hash
|
||||
no_corrupted_files = True
|
||||
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
no_corrupted_files = file.check_hash()
|
||||
file.check_hash()
|
||||
if file.health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
no_corrupted_files = False
|
||||
|
||||
# if one file in the folder is corrupted, set the folder status to corrupted
|
||||
if not no_corrupted_files:
|
||||
@@ -287,61 +313,53 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
self.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})")
|
||||
|
||||
return no_corrupted_files
|
||||
|
||||
def repair(self) -> bool:
|
||||
def repair(self) -> None:
|
||||
"""Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
|
||||
super().repair()
|
||||
|
||||
repaired = False
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to repair deleted folder {self.name}")
|
||||
return
|
||||
|
||||
# iterate through the files in the folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
repaired = file.repair()
|
||||
file.repair()
|
||||
|
||||
# set file status to good if corrupt
|
||||
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.health_status = FileSystemItemHealthStatus.GOOD
|
||||
repaired = True
|
||||
|
||||
self.health_status = FileSystemItemHealthStatus.GOOD
|
||||
|
||||
self.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})")
|
||||
return repaired
|
||||
|
||||
def restore(self) -> bool:
|
||||
"""Restore a File by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
|
||||
super().restore()
|
||||
def restore(self) -> None:
|
||||
"""
|
||||
If a Folder is corrupted, run a repair on the folder and its child files.
|
||||
|
||||
restored = False
|
||||
If the folder is deleted, restore the folder by setting deleted status to False.
|
||||
"""
|
||||
pass
|
||||
|
||||
# iterate through the files in the folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
restored = file.restore()
|
||||
|
||||
# set file status to corrupt if good
|
||||
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.health_status = FileSystemItemHealthStatus.GOOD
|
||||
restored = True
|
||||
|
||||
self.sys_log.info(f"Restored folder {self.name} (id: {self.uuid})")
|
||||
return restored
|
||||
|
||||
def corrupt(self) -> bool:
|
||||
def corrupt(self) -> None:
|
||||
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT."""
|
||||
super().corrupt()
|
||||
|
||||
corrupted = False
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to corrupt deleted folder {self.name}")
|
||||
return
|
||||
|
||||
# iterate through the files in the folder
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
corrupted = file.corrupt()
|
||||
file.corrupt()
|
||||
|
||||
# set file status to corrupt if good
|
||||
if self.health_status == FileSystemItemHealthStatus.GOOD:
|
||||
self.health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
corrupted = True
|
||||
# set file status to corrupt
|
||||
self.health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
self.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})")
|
||||
return corrupted
|
||||
|
||||
def delete(self):
|
||||
"""Marks the file as deleted. Prevents agent actions from occuring."""
|
||||
if self.deleted:
|
||||
self.sys_log.error(f"Unable to delete an already deleted folder {self.name}")
|
||||
return
|
||||
|
||||
self.deleted = True
|
||||
|
||||
@@ -129,7 +129,7 @@ class DatabaseService(Service):
|
||||
self._conn.close()
|
||||
# replace db file
|
||||
self.file_system.delete_file(folder_name=self.folder.name, file_name="downloads.db")
|
||||
self.file_system.move_file(
|
||||
self.file_system.copy_file(
|
||||
src_folder_name="downloads", src_file_name="database.db", dst_folder_name=self.folder.name
|
||||
)
|
||||
self._db_file = self.file_system.get_file(folder_name=self.folder.name, file_name="database.db")
|
||||
|
||||
@@ -44,24 +44,24 @@ def test_file_reveal_to_red_scan(file_system):
|
||||
def test_simulated_file_check_hash(file_system):
|
||||
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert file.check_hash() is True
|
||||
|
||||
file.check_hash()
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
# change simulated file size
|
||||
file.sim_size = 0
|
||||
assert file.check_hash() is False
|
||||
file.check_hash()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_real_file_check_hash(file_system):
|
||||
file: File = file_system.create_file(file_name="test_file.txt", real=True)
|
||||
|
||||
assert file.check_hash() is True
|
||||
|
||||
file.check_hash()
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
# change file content
|
||||
with open(file.sim_path, "a") as f:
|
||||
f.write("get hacked scrub lol xD\n")
|
||||
|
||||
assert file.check_hash() is False
|
||||
file.check_hash()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.file_system.file_system import File, FileSystem, Folder
|
||||
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]:
|
||||
"""Create a file system with a folder and file."""
|
||||
folder = file_system.create_folder(folder_name="test_folder")
|
||||
file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt")
|
||||
|
||||
return file_system, folder, file
|
||||
|
||||
|
||||
def test_file_scan_request(populated_file_system):
|
||||
"""Test that an agent can request a file scan."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
file.corrupt()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "scan"])
|
||||
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_checkhash_request(populated_file_system):
|
||||
"""Test that an agent can request a file hash check."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "checkhash"])
|
||||
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
file.sim_size = 0
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "checkhash"])
|
||||
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_repair_request(populated_file_system):
|
||||
"""Test that an agent can request a file repair."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
file.corrupt()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "repair"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
|
||||
def test_file_restore_request(populated_file_system):
|
||||
pass
|
||||
|
||||
|
||||
def test_file_corrupt_request(populated_file_system):
|
||||
"""Test that an agent can request a file corruption."""
|
||||
fs, folder, file = populated_file_system
|
||||
fs.apply_request(request=["file", file.uuid, "corrupt"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_deleted_file_cannot_be_interacted_with(populated_file_system):
|
||||
"""Test that actions cannot affect deleted files."""
|
||||
fs, folder, file = populated_file_system
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "corrupt"])
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert (
|
||||
fs.get_file(folder_name=folder.name, file_name=file.name).visible_health_status
|
||||
== FileSystemItemHealthStatus.GOOD
|
||||
)
|
||||
|
||||
fs.apply_request(request=["delete", "file", folder.uuid, file.uuid])
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "repair"])
|
||||
fs.apply_request(request=["file", file.uuid, "scan"])
|
||||
|
||||
file = folder.deleted_files.get(file.uuid)
|
||||
|
||||
assert file.health_status is not FileSystemItemHealthStatus.GOOD
|
||||
assert file.visible_health_status is not FileSystemItemHealthStatus.CORRUPT
|
||||
@@ -1,7 +1,9 @@
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.file_system.file import File
|
||||
from primaite.simulator.file_system.file_system import FileSystem
|
||||
from primaite.simulator.file_system.file_type import FileType
|
||||
from primaite.simulator.file_system.folder import Folder
|
||||
|
||||
|
||||
def test_create_folder_and_file(file_system):
|
||||
@@ -65,6 +67,34 @@ def test_delete_folder(file_system):
|
||||
assert len(file_system.deleted_folders) == 1
|
||||
|
||||
|
||||
def test_create_duplicate_folder(file_system):
|
||||
"""Test that creating a duplicate folder throws exception."""
|
||||
assert len(file_system.folders) == 1
|
||||
file_system.create_folder(folder_name="test_folder")
|
||||
|
||||
assert len(file_system.folders) is 2
|
||||
with pytest.raises(Exception):
|
||||
file_system.create_folder(folder_name="test_folder")
|
||||
|
||||
assert len(file_system.folders) is 2
|
||||
|
||||
|
||||
def test_create_duplicate_file(file_system):
|
||||
"""Test that creating a duplicate file throws exception."""
|
||||
assert len(file_system.folders) == 1
|
||||
file_system.create_folder(folder_name="test_folder")
|
||||
|
||||
assert len(file_system.folders) is 2
|
||||
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert len(file_system.get_folder("test_folder").files) == 1
|
||||
|
||||
with pytest.raises(Exception):
|
||||
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert len(file_system.get_folder("test_folder").files) == 1
|
||||
|
||||
|
||||
def test_deleting_a_non_existent_folder(file_system):
|
||||
file_system.create_folder(folder_name="test_folder")
|
||||
assert len(file_system.folders) == 2
|
||||
@@ -116,6 +146,23 @@ def test_copy_file(file_system):
|
||||
assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid
|
||||
|
||||
|
||||
def test_get_file(file_system):
|
||||
"""Test that files can be retrieved."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file1: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
file2: File = file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
|
||||
|
||||
folder.remove_file(file2)
|
||||
|
||||
assert file_system.get_file_by_id(file_uuid=file1.uuid, folder_uuid=folder.uuid) is not None
|
||||
assert file_system.get_file_by_id(file_uuid=file2.uuid, folder_uuid=folder.uuid) is None
|
||||
assert file_system.get_file_by_id(file_uuid=file2.uuid, folder_uuid=folder.uuid, include_deleted=True) is not None
|
||||
assert file_system.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None
|
||||
|
||||
file_system.delete_folder(folder_name="test_folder")
|
||||
assert file_system.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Skipping until we tackle serialisation")
|
||||
def test_serialisation(file_system):
|
||||
"""Test to check that the object serialisation works correctly."""
|
||||
|
||||
@@ -8,139 +8,20 @@ from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHe
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]:
|
||||
"""Test that an agent can request a file scan."""
|
||||
"""Create a file system with a folder and file."""
|
||||
folder = file_system.create_folder(folder_name="test_folder")
|
||||
file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt")
|
||||
|
||||
return file_system, folder, file
|
||||
|
||||
|
||||
def test_file_scan_request(populated_file_system):
|
||||
"""Test that an agent can request a file scan."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
file.corrupt()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "scan"])
|
||||
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_folder_scan_request(populated_file_system):
|
||||
"""Test that an agent can request a folder scan."""
|
||||
fs, folder, file = populated_file_system
|
||||
fs.create_file(file_name="test_file2.txt", folder_name="test_folder")
|
||||
|
||||
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
|
||||
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
|
||||
|
||||
folder.corrupt()
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "scan"])
|
||||
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_checkhash_request(populated_file_system):
|
||||
"""Test that an agent can request a file hash check."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "checkhash"])
|
||||
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
file.sim_size = 0
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "checkhash"])
|
||||
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_folder_checkhash_request(populated_file_system):
|
||||
"""Test that an agent can request a folder hash check."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.GOOD
|
||||
file.sim_size = 0
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_repair_request(populated_file_system):
|
||||
"""Test that an agent can request a file repair."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
file.corrupt()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "repair"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
|
||||
def test_folder_repair_request(populated_file_system):
|
||||
"""Test that an agent can request a folder repair."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
folder.corrupt()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "repair"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert folder.health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
|
||||
def test_file_restore_request(populated_file_system):
|
||||
pass
|
||||
|
||||
|
||||
def test_folder_restore_request(populated_file_system):
|
||||
pass
|
||||
|
||||
|
||||
def test_file_corrupt_request(populated_file_system):
|
||||
"""Test that an agent can request a file corruption."""
|
||||
fs, folder, file = populated_file_system
|
||||
fs.apply_request(request=["file", file.uuid, "corrupt"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_folder_corrupt_request(populated_file_system):
|
||||
"""Test that an agent can request a folder corruption."""
|
||||
fs, folder, file = populated_file_system
|
||||
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_delete_request(populated_file_system):
|
||||
"""Test that an agent can request a file deletion."""
|
||||
fs, folder, file = populated_file_system
|
||||
assert folder.get_file_by_id(file_uuid=file.uuid) is not None
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "delete", file.uuid])
|
||||
assert folder.get_file_by_id(file_uuid=file.uuid) is None
|
||||
fs.apply_request(request=["delete", "file", folder.uuid, file.uuid])
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
|
||||
|
||||
|
||||
def test_folder_delete_request(populated_file_system):
|
||||
@@ -149,6 +30,6 @@ def test_folder_delete_request(populated_file_system):
|
||||
assert folder.get_file_by_id(file_uuid=file.uuid) is not None
|
||||
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is not None
|
||||
|
||||
fs.apply_request(request=["delete", folder.uuid])
|
||||
fs.apply_request(request=["delete", "folder", folder.uuid])
|
||||
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is None
|
||||
assert folder.get_file_by_id(file_uuid=file.uuid) is None
|
||||
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is None
|
||||
|
||||
@@ -19,6 +19,20 @@ def test_folder_quarantine_state(file_system):
|
||||
assert folder.quarantine_status() is False
|
||||
|
||||
|
||||
def test_folder_get_file(file_system):
|
||||
"""Test that files can be retrieved from the folder."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file1: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
file2: File = file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
|
||||
|
||||
folder.remove_file(file2)
|
||||
|
||||
assert folder.get_file_by_id(file_uuid=file1.uuid) is not None
|
||||
assert folder.get_file_by_id(file_uuid=file2.uuid) is None
|
||||
|
||||
assert folder.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None
|
||||
|
||||
|
||||
def test_folder_scan(file_system):
|
||||
"""Test the ability to update visible status."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
@@ -107,12 +121,13 @@ def test_simulated_folder_check_hash(file_system):
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
|
||||
assert folder.check_hash() is True
|
||||
folder.check_hash()
|
||||
assert folder.health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
# change simulated file size
|
||||
file = folder.get_file(file_name="test_file.txt")
|
||||
file.sim_size = 0
|
||||
assert folder.check_hash() is False
|
||||
folder.check_hash()
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
@@ -120,8 +135,8 @@ def test_real_folder_check_hash(file_system):
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
|
||||
|
||||
assert folder.check_hash() is True
|
||||
|
||||
folder.check_hash()
|
||||
assert folder.health_status == FileSystemItemHealthStatus.GOOD
|
||||
# change simulated file size
|
||||
file = folder.get_file(file_name="test_file.txt")
|
||||
|
||||
@@ -129,5 +144,5 @@ def test_real_folder_check_hash(file_system):
|
||||
with open(file.sim_path, "a") as f:
|
||||
f.write("get hacked scrub lol xD\n")
|
||||
|
||||
assert folder.check_hash() is False
|
||||
folder.check_hash()
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.file_system.file_system import File, FileSystem, Folder
|
||||
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]:
|
||||
"""Create a file system with a folder and file."""
|
||||
folder = file_system.create_folder(folder_name="test_folder")
|
||||
file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt")
|
||||
|
||||
return file_system, folder, file
|
||||
|
||||
|
||||
def test_folder_scan_request(populated_file_system):
|
||||
"""Test that an agent can request a folder scan."""
|
||||
fs, folder, file = populated_file_system
|
||||
fs.create_file(file_name="test_file2.txt", folder_name="test_folder")
|
||||
|
||||
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
|
||||
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
|
||||
|
||||
folder.corrupt()
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "scan"])
|
||||
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_folder_checkhash_request(populated_file_system):
|
||||
"""Test that an agent can request a folder hash check."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.GOOD
|
||||
file.sim_size = 0
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_folder_repair_request(populated_file_system):
|
||||
"""Test that an agent can request a folder repair."""
|
||||
fs, folder, file = populated_file_system
|
||||
|
||||
folder.corrupt()
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
fs.apply_request(request=["folder", folder.uuid, "repair"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert folder.health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
|
||||
def test_folder_restore_request(populated_file_system):
|
||||
pass
|
||||
|
||||
|
||||
def test_folder_corrupt_request(populated_file_system):
|
||||
"""Test that an agent can request a folder corruption."""
|
||||
fs, folder, file = populated_file_system
|
||||
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
|
||||
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_deleted_folder_and_its_files_cannot_be_interacted_with(populated_file_system):
|
||||
"""Test that actions cannot affect deleted folder and its child files."""
|
||||
fs, folder, file = populated_file_system
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "corrupt"])
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
fs.apply_request(request=["delete", "folder", folder.uuid])
|
||||
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
|
||||
|
||||
fs.apply_request(request=["file", file.uuid, "repair"])
|
||||
|
||||
deleted_folder = fs.deleted_folders.get(folder.uuid)
|
||||
deleted_file = deleted_folder.deleted_files.get(file.uuid)
|
||||
|
||||
assert deleted_file.health_status is not FileSystemItemHealthStatus.GOOD
|
||||
Reference in New Issue
Block a user