Merged PR 206: 1962-Folder and File restore

## Summary
- Implement the folder/file restore
- Split the FileSystem and its classes into their own files so that it is easier to maintain.
- Reimplement how deleted folders and files are stored in a deleted list, so that it is possible to restore later

## Test process
See https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/206?_a=files&path=/tests/unit_tests/_primaite/_simulator/_file_system
- Unit tests written to make sure methods work as intended
- Integration tests with the apply_request method which tests that the agents can send the request to the simulation to affect it

## Checklist
- [X] PR is linked to a **work item**
- [X] **acceptance criteria** of linked ticket are met
- [X] performed **self-review** of the code
- [X] written **tests** for any new functionality added with this PR
- [ ] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [ ] updated the **change log**
- [X] ran **pre-commit** checks for code style
- [ ] attended to any **TO-DOs** left in the code

Related work items: #1962
This commit is contained in:
Czar Echavez
2023-11-09 11:55:53 +00:00
13 changed files with 1600 additions and 991 deletions

View File

@@ -27,7 +27,18 @@ jobs:
- script: |
pip install -e .[dev]
displayName: 'Install Yawning-Titan for docs autosummary'
displayName: 'Install PrimAITE for docs autosummary'
- script: |
GATE_WHEEL=$(ls ./GATE/arcd_gate*.whl)
python -m pip install $GATE_WHEEL[dev]
displayName: 'Install GATE'
condition: or(eq( variables['Agent.OS'], 'Linux' ), eq( variables['Agent.OS'], 'Darwin' ))
- script: |
forfiles /p GATE\ /m *.whl /c "cmd /c python -m pip install @file[dev]"
displayName: 'Install GATE'
condition: eq( variables['Agent.OS'], 'Windows_NT' )
- script: |
primaite setup

View File

@@ -0,0 +1,198 @@
from __future__ import annotations
import hashlib
import json
import os.path
from pathlib import Path
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension
_LOGGER = getLogger(__name__)
class File(FileSystemItemABC):
"""
Class representing a file in the simulation.
:ivar Folder folder: The folder in which the file resides.
:ivar FileType file_type: The type of the file.
:ivar Optional[int] sim_size: The simulated file size.
:ivar bool real: Indicates if the file is actually a real file in the Node sim fs output.
:ivar Optional[Path] sim_path: The path if the file is real.
"""
folder_id: str
"The id of the Folder the File is in."
folder_name: str
"The name of the Folder the file is in."
file_type: FileType
"The type of File."
sim_size: Optional[int] = None
"The simulated file size."
real: bool = False
"Indicates whether the File is actually a real file in the Node sim fs output."
sim_path: Optional[Path] = None
"The Path if real is True."
sim_root: Optional[Path] = None
"Root path of the simulation."
def __init__(self, **kwargs):
"""
Initialise File class.
:param name: The name of the file.
:param file_type: The FileType of the file
:param size: The size of the FileSystemItemABC
"""
has_extension = "." in kwargs["name"]
# Attempt to use the file type extension to set/override the FileType
if has_extension:
extension = kwargs["name"].split(".")[-1]
kwargs["file_type"] = get_file_type_from_extension(extension)
else:
# If the file name does not have a extension, override file type to FileType.UNKNOWN
if not kwargs["file_type"]:
kwargs["file_type"] = FileType.UNKNOWN
if kwargs["file_type"] != FileType.UNKNOWN:
kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}"
# set random file size if none provided
if not kwargs.get("sim_size"):
kwargs["sim_size"] = kwargs["file_type"].default_size
super().__init__(**kwargs)
if self.real:
self.sim_path = self.sim_root / self.path
if not self.sim_path.exists():
self.sim_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.sim_path, mode="a"):
pass
self.sys_log.info(f"Created file /{self.path} (id: {self.uuid})")
@property
def path(self) -> str:
"""
Get the path of the file in the file system.
:return: The full path of the file.
"""
return f"{self.folder_name}/{self.name}"
@property
def size(self) -> int:
"""
Get the size of the file in bytes.
:return: The size of the file in bytes.
"""
if self.real:
return os.path.getsize(self.sim_path)
return self.sim_size
def describe_state(self) -> Dict:
"""Produce a dictionary describing the current state of this object."""
state = super().describe_state()
state["size"] = self.size
state["file_type"] = self.file_type.name
return state
def scan(self) -> None:
"""Updates the visible statuses of the file."""
if self.deleted:
self.sys_log.error(f"Unable to scan deleted file {self.folder_name}/{self.name}")
return
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
self.visible_health_status = self.health_status
def reveal_to_red(self) -> None:
"""Reveals the folder/file to the red agent."""
if self.deleted:
self.sys_log.error(f"Unable to reveal deleted file {self.folder_name}/{self.name}")
return
self.revealed_to_red = True
def check_hash(self) -> None:
"""
Check if the file has been changed.
If changed, the file is considered corrupted.
Return False if corruption is detected, otherwise True
"""
if self.deleted:
self.sys_log.error(f"Unable to check hash of deleted file {self.folder_name}/{self.name}")
return
current_hash = None
# if file is real, read the file contents
if self.real:
with open(self.sim_path, "rb") as f:
file_hash = hashlib.blake2b()
while chunk := f.read(8192):
file_hash.update(chunk)
current_hash = file_hash.hexdigest()
else:
# otherwise get describe_state dict and hash that
current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest()
# if the previous hash is None, set the current hash to previous
if self.previous_hash is None:
self.previous_hash = current_hash
# if the previous hash and current hash do not match, mark file as corrupted
if self.previous_hash is not current_hash:
self.corrupt()
def repair(self) -> None:
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
if self.deleted:
self.sys_log.error(f"Unable to repair deleted file {self.folder_name}/{self.name}")
return
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
def corrupt(self) -> None:
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT."""
if self.deleted:
self.sys_log.error(f"Unable to corrupt deleted file {self.folder_name}/{self.name}")
return
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPT
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
def restore(self) -> None:
"""Determines if the file needs to be repaired or unmarked as deleted."""
if self.deleted:
self.deleted = False
return
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
def delete(self):
"""Marks the file as deleted."""
if self.deleted:
self.sys_log.error(f"Unable to delete an already deleted file {self.folder_name}/{self.name}")
return
self.deleted = True
self.sys_log.info(f"File deleted {self.folder_name}/{self.name}")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,173 @@
from __future__ import annotations
import math
from abc import abstractmethod
from enum import Enum
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.system.core.sys_log import SysLog
_LOGGER = getLogger(__name__)
def convert_size(size_bytes: int) -> str:
"""
Convert a file size from bytes to a string with a more human-readable format.
This function takes the size of a file in bytes and converts it to a string representation with appropriate size
units (B, KB, MB, GB, etc.).
:param size_bytes: The size of the file in bytes.
:return: The human-readable string representation of the file size.
"""
if size_bytes == 0:
return "0 B"
# Tuple of size units starting from Bytes up to Yottabytes
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
# Calculate the index (i) that will be used to select the appropriate size unit from size_name
i = int(math.floor(math.log(size_bytes, 1024)))
# Calculate the adjusted size value (s) in terms of the new size unit
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
class FileSystemItemHealthStatus(Enum):
"""Status of the FileSystemItem."""
GOOD = 1
"""File/Folder is OK."""
COMPROMISED = 2
"""File/Folder is quarantined."""
CORRUPT = 3
"""File/Folder is corrupted."""
RESTORING = 4
"""File/Folder is in the process of being restored."""
REPAIRING = 5
"""File/Folder is in the process of being repaired."""
class FileSystemItemABC(SimComponent):
"""
Abstract base class for file system items used in the file system simulation.
:ivar name: The name of the FileSystemItemABC.
"""
name: str
"The name of the FileSystemItemABC."
health_status: FileSystemItemHealthStatus = FileSystemItemHealthStatus.GOOD
"Actual status of the current FileSystemItem"
visible_health_status: FileSystemItemHealthStatus = FileSystemItemHealthStatus.GOOD
"Visible status of the current FileSystemItem"
previous_hash: Optional[str] = None
"Hash of the file contents or the description state"
revealed_to_red: bool = False
"If true, the folder/file has been revealed to the red agent."
sys_log: SysLog
"Used for creating system logs."
deleted: bool = False
"If true, the FileSystemItem was deleted."
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["name"] = self.name
state["health_status"] = self.health_status.value
state["visible_status"] = self.visible_health_status.value
state["previous_hash"] = self.previous_hash
state["revealed_to_red"] = self.revealed_to_red
return state
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request(name="scan", request_type=RequestType(func=lambda request, context: self.scan()))
rm.add_request(name="checkhash", request_type=RequestType(func=lambda request, context: self.check_hash()))
rm.add_request(name="repair", request_type=RequestType(func=lambda request, context: self.repair()))
rm.add_request(name="restore", request_type=RequestType(func=lambda request, context: self.restore()))
rm.add_request(name="corrupt", request_type=RequestType(func=lambda request, context: self.corrupt()))
return rm
@property
def size_str(self) -> str:
"""
Get the file size in a human-readable string format.
This property makes use of the :func:`convert_size` function to convert the `self.size` attribute to a string
that is easier to read and understand.
:return: The human-readable string representation of the file size.
"""
return convert_size(self.size)
@abstractmethod
def scan(self) -> None:
"""Scan the folder/file - updates the visible_health_status."""
pass
@abstractmethod
def reveal_to_red(self) -> None:
"""Reveal the folder/file to the red agent."""
pass
@abstractmethod
def check_hash(self) -> None:
"""
Checks the has of the file to detect any changes.
For current implementation, any change in file hash means it is compromised.
Return False if corruption is detected, otherwise True
"""
pass
@abstractmethod
def repair(self) -> None:
"""
Repair the FileSystemItem.
True if successfully repaired. False otherwise.
"""
pass
@abstractmethod
def corrupt(self) -> None:
"""
Corrupt the FileSystemItem.
True if successfully corrupted. False otherwise.
"""
pass
@abstractmethod
def restore(self) -> None:
"""Restore the file/folder to the state before it got ruined."""
pass
@abstractmethod
def delete(self) -> None:
"""Mark the file/folder as deleted."""
self.deleted = True

View File

@@ -0,0 +1,423 @@
from __future__ import annotations
from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
_LOGGER = getLogger(__name__)
class Folder(FileSystemItemABC):
"""Simulation Folder."""
files: Dict[str, File] = {}
"Files stored in the folder."
_files_by_name: Dict[str, File] = {}
"Files by their name as <file name>.<file type>."
deleted_files: Dict[str, File] = {}
"Files that have been deleted."
scan_duration: int = 3
"How many timesteps to complete a scan. Default 3 timesteps"
scan_countdown: int = 0
"Time steps needed until scan completion."
red_scan_duration: int = 3
"How many timesteps to complete reveal to red scan. Default 3 timesteps"
red_scan_countdown: int = 0
"Time steps needed until red scan completion."
restore_duration: int = 3
"How many timesteps to complete a restore. Default 3 timesteps"
restore_countdown: int = 0
"Time steps needed until restore completion."
def __init__(self, **kwargs):
"""
Initialise Folder class.
:param name: The name of the folder.
:param sys_log: The SysLog instance to us to create system logs.
"""
super().__init__(**kwargs)
self.sys_log.info(f"Created file /{self.name} (id: {self.uuid})")
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request(
name="delete",
request_type=RequestType(func=lambda request, context: self.remove_file_by_id(file_uuid=request[0])),
)
return rm
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()}
state["deleted_files"] = {file.name: file.describe_state() for uuid, file in self.deleted_files.items()}
return state
def show(self, markdown: bool = False):
"""
Display the contents of the Folder in tabular format.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["File", "Size", "Deleted"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} File System Folder ({self.name})"
for file in self.files.values():
table.add_row([file.name, file.size_str, file.deleted])
print(table.get_string(sortby="File"))
@property
def size(self) -> int:
"""
Calculate and return the total size of all files in the folder.
:return: The total size of all files in the folder. If no files exist or all have `None`
size, returns 0.
"""
return sum(file.size for file in self.files.values() if file.size is not None)
def apply_timestep(self, timestep: int):
"""
Apply a single timestep of simulation dynamics to this folder and its files.
In this instance, if any multi-timestep processes are currently occurring (such as scanning),
then they are brought one step closer to being finished.
:param timestep: The current timestep number. (Amount of time since simulation episode began)
:type timestep: int
"""
super().apply_timestep(timestep=timestep)
self._scan_timestep()
self._reveal_to_red_timestep()
self._restoring_timestep()
# apply timestep to files in folder
for file_id in self.files:
self.files[file_id].apply_timestep(timestep=timestep)
def _scan_timestep(self) -> None:
"""Apply the scan action timestep."""
if self.scan_countdown >= 0:
self.scan_countdown -= 1
if self.scan_countdown == 0:
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
def _reveal_to_red_timestep(self) -> None:
"""Apply reveal to red timestep."""
if self.red_scan_countdown >= 0:
self.red_scan_countdown -= 1
if self.red_scan_countdown == 0:
self.revealed_to_red = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.reveal_to_red()
def _restoring_timestep(self) -> None:
"""Apply restoring timestep."""
if self.restore_countdown >= 0:
self.restore_countdown -= 1
if self.restore_countdown == 0:
# repair all files
for file_id in self.files:
self.restore_file(file_uuid=file_id)
deleted_files = self.deleted_files.copy()
for file_id in deleted_files:
self.restore_file(file_uuid=file_id)
if self.deleted:
self.deleted = False
elif self.health_status in [FileSystemItemHealthStatus.CORRUPT, FileSystemItemHealthStatus.RESTORING]:
self.health_status = FileSystemItemHealthStatus.GOOD
def get_file(self, file_name: str) -> Optional[File]:
"""
Get a file by its name.
File name must be the filename and prefix, like 'memo.docx'.
:param file_name: The file name.
:return: The matching File.
"""
# TODO: Increment read count?
return self._files_by_name.get(file_name)
def get_file_by_id(self, file_uuid: str, include_deleted: Optional[bool] = False) -> File:
"""
Get a file by its uuid.
:param: file_uuid: The file uuid.
:param: include_deleted: If true, the deleted files will also be checked
:return: The matching File.
"""
if include_deleted:
deleted_file = self.deleted_files.get(file_uuid)
if deleted_file:
return deleted_file
return self.files.get(file_uuid)
def add_file(self, file: File, force: Optional[bool] = False):
"""
Adds a file to the folder.
:param: file: The File object to be added to the folder.
:param: force: Overwrite file - do not check if uuid or name already exists in folder. Default False.
:raises Exception: If the provided `file` parameter is None or not an instance of the
`File` class.
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
# check if file with id or name already exists in folder
if (force is not True) and file.name in self._files_by_name:
raise Exception(f"File with name {file.name} already exists in folder")
if (force is not True) and file.uuid in self.files:
raise Exception(f"File with uuid {file.uuid} already exists in folder")
# add to list
self.files[file.uuid] = file
self._files_by_name[file.name] = file
file.folder = self
def remove_file(self, file: Optional[File]):
"""
Removes a file from the folder list.
The method can take a File object or a file id.
:param file: The file to remove
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
if self.files.get(file.uuid):
self.files.pop(file.uuid)
self._files_by_name.pop(file.name)
self.deleted_files[file.uuid] = file
file.delete()
self.sys_log.info(f"Removed file {file.name} (id: {file.uuid})")
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
def remove_file_by_id(self, file_uuid: str):
"""
Remove a file using id.
:param: file_uuid: The UUID of the file to remove.
"""
file = self.get_file_by_id(file_uuid=file_uuid)
self.remove_file(file=file)
def remove_all_files(self):
"""Removes all the files in the folder."""
for file_id in self.files:
file = self.files.get(file_id)
file.delete()
self.deleted_files[file_id] = file
self.files = {}
self._files_by_name = {}
def restore_file(self, file_uuid: str):
"""
Restores a file.
:param file_uuid: The id of the file to restore
"""
# if the file was not deleted, run a repair
file = self.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
if not file:
self.sys_log.error(f"Unable to restore file with uuid {file_uuid}. File does not exist.")
return
file.restore()
self.files[file.uuid] = file
self._files_by_name[file.name] = file
if file.deleted:
self.deleted_files.pop(file_uuid)
def quarantine(self):
"""Quarantines the File System Folder."""
pass
def unquarantine(self):
"""Unquarantine of the File System Folder."""
pass
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
pass
def scan(self, instant_scan: bool = False) -> None:
"""
Update Folder visible status.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
if self.deleted:
self.sys_log.error(f"Unable to scan deleted folder {self.name}")
return
if instant_scan:
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
return
if self.scan_countdown <= 0:
# scan one file per timestep
self.scan_countdown = self.scan_duration
self.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})")
def reveal_to_red(self, instant_scan: bool = False):
"""
Reveals the folders and files to the red agent.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
if self.deleted:
self.sys_log.error(f"Unable to reveal deleted folder {self.name}")
return
if instant_scan:
self.revealed_to_red = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.reveal_to_red()
return
if self.red_scan_countdown <= 0:
# scan one file per timestep
self.red_scan_countdown = self.red_scan_duration
self.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})")
def check_hash(self) -> None:
"""
Runs a :func:`check_hash` on all files in the folder.
If a file under the folder is corrupted, the whole folder is considered corrupted.
TODO: For now this will just iterate through the files and run :func:`check_hash` and ignores
any other changes to the folder
Return False if corruption is detected, otherwise True
"""
if self.deleted:
self.sys_log.error(f"Unable to check hash of deleted folder {self.name}")
return
# iterate through the files and run a check hash
no_corrupted_files = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.check_hash()
if file.health_status == FileSystemItemHealthStatus.CORRUPT:
no_corrupted_files = False
# if one file in the folder is corrupted, set the folder status to corrupted
if not no_corrupted_files:
self.corrupt()
self.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})")
def repair(self) -> None:
"""Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
if self.deleted:
self.sys_log.error(f"Unable to repair deleted folder {self.name}")
return
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.repair()
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
self.health_status = FileSystemItemHealthStatus.GOOD
self.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})")
def restore(self) -> None:
"""
If a Folder is corrupted, run a repair on the folder and its child files.
If the folder is deleted, restore the folder by setting deleted status to False.
"""
if self.deleted:
self.deleted = False
if self.restore_countdown <= 0:
self.restore_countdown = self.restore_duration
self.health_status = FileSystemItemHealthStatus.RESTORING
self.sys_log.info(f"Restoring folder: {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.sys_log.info(f"Folder restoration already in progress {self.name} (id: {self.uuid})")
def corrupt(self) -> None:
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT."""
if self.deleted:
self.sys_log.error(f"Unable to corrupt deleted folder {self.name}")
return
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.corrupt()
# set file status to corrupt
self.health_status = FileSystemItemHealthStatus.CORRUPT
self.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})")
def delete(self):
"""Marks the file as deleted. Prevents agent actions from occuring."""
if self.deleted:
self.sys_log.error(f"Unable to delete an already deleted folder {self.name}")
return
self.deleted = True

View File

@@ -129,7 +129,7 @@ class DatabaseService(Service):
self._conn.close()
# replace db file
self.file_system.delete_file(folder_name=self.folder.name, file_name="downloads.db")
self.file_system.move_file(
self.file_system.copy_file(
src_folder_name="downloads", src_file_name="database.db", dst_folder_name=self.folder.name
)
self._db_file = self.file_system.get_file(folder_name=self.folder.name, file_name="database.db")

View File

@@ -0,0 +1,82 @@
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.file_type import FileType
def test_create_file_no_extension(file_system):
"""Tests that creating a file without an extension sets the file type to FileType.UNKNOWN."""
file = file_system.create_file(file_name="test_file")
assert len(file_system.folders) is 1
assert file_system.get_folder("root").get_file("test_file") == file
assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN
assert file_system.get_folder("root").get_file("test_file").size == 0
def test_file_scan(file_system):
"""Test the ability to update visible status."""
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.scan()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_reveal_to_red_scan(file_system):
"""Test the ability to reveal files to red."""
file = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.revealed_to_red is False
file.reveal_to_red()
assert file.revealed_to_red is True
def test_simulated_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file.check_hash()
assert file.health_status == FileSystemItemHealthStatus.GOOD
# change simulated file size
file.sim_size = 0
file.check_hash()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", real=True)
file.check_hash()
assert file.health_status == FileSystemItemHealthStatus.GOOD
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
file.check_hash()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_corrupt_repair_restore(file_system):
"""Test the ability to corrupt and repair files."""
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
file.repair()
assert file.health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
file.restore()
assert file.health_status == FileSystemItemHealthStatus.GOOD

View File

@@ -0,0 +1,107 @@
from typing import Tuple
import pytest
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
@pytest.fixture(scope="function")
def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]:
"""Create a file system with a folder and file."""
folder = file_system.create_folder(folder_name="test_folder")
file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt")
return file_system, folder, file
def test_file_scan_request(populated_file_system):
"""Test that an agent can request a file scan."""
fs, folder, file = populated_file_system
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["file", file.uuid, "scan"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_checkhash_request(populated_file_system):
"""Test that an agent can request a file hash check."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.uuid, "checkhash"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["file", file.uuid, "checkhash"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_repair_request(populated_file_system):
"""Test that an agent can request a file repair."""
fs, folder, file = populated_file_system
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["file", file.uuid, "repair"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_restore_request(populated_file_system):
"""Test that an agent can request that a file can be restored."""
fs, folder, file = populated_file_system
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is not None
fs.apply_request(request=["delete", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True
fs.apply_request(request=["restore", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["restore", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.GOOD
def test_file_corrupt_request(populated_file_system):
"""Test that an agent can request a file corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_deleted_file_cannot_be_interacted_with(populated_file_system):
"""Test that actions cannot affect deleted files."""
fs, folder, file = populated_file_system
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
assert (
fs.get_file(folder_name=folder.name, file_name=file.name).visible_health_status
== FileSystemItemHealthStatus.GOOD
)
fs.apply_request(request=["delete", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
fs.apply_request(request=["file", file.uuid, "repair"])
fs.apply_request(request=["file", file.uuid, "scan"])
file = folder.deleted_files.get(file.uuid)
assert file.health_status is not FileSystemItemHealthStatus.GOOD
assert file.visible_health_status is not FileSystemItemHealthStatus.CORRUPT

View File

@@ -1,6 +1,6 @@
import pytest
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.file_type import FileType
@@ -16,6 +16,8 @@ def test_create_folder_and_file(file_system):
assert file_system.get_folder("test_folder").get_file("test_file.txt")
file_system.show(full=True)
def test_create_file_no_folder(file_system):
"""Tests that creating a file without a folder creates a folder and sets that as the file's parent."""
@@ -25,14 +27,7 @@ def test_create_file_no_folder(file_system):
assert file_system.get_folder("root").get_file("test_file.txt").file_type == FileType.TXT
assert file_system.get_folder("root").get_file("test_file.txt").size == 10
def test_create_file_no_extension(file_system):
"""Tests that creating a file without an extension sets the file type to FileType.UNKNOWN."""
file = file_system.create_file(file_name="test_file")
assert len(file_system.folders) is 1
assert file_system.get_folder("root").get_file("test_file") == file
assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN
assert file_system.get_folder("root").get_file("test_file").size == 0
file_system.show(full=True)
def test_delete_file(file_system):
@@ -44,6 +39,9 @@ def test_delete_file(file_system):
file_system.delete_file(folder_name="root", file_name="test_file.txt")
assert len(file_system.folders) == 1
assert len(file_system.get_folder("root").files) == 0
assert len(file_system.get_folder("root").deleted_files) == 1
file_system.show(full=True)
def test_delete_non_existent_file(file_system):
@@ -62,6 +60,8 @@ def test_delete_non_existent_file(file_system):
# The folder should still have 1 file
assert len(file_system.get_folder("root").files) == 1
file_system.show(full=True)
def test_delete_folder(file_system):
file_system.create_folder(folder_name="test_folder")
@@ -70,6 +70,42 @@ def test_delete_folder(file_system):
file_system.delete_folder(folder_name="test_folder")
assert len(file_system.folders) == 1
assert len(file_system.deleted_folders) == 1
file_system.show(full=True)
def test_create_duplicate_folder(file_system):
"""Test that creating a duplicate folder throws exception."""
assert len(file_system.folders) == 1
file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 2
with pytest.raises(Exception):
file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 2
file_system.show(full=True)
def test_create_duplicate_file(file_system):
"""Test that creating a duplicate file throws exception."""
assert len(file_system.folders) == 1
file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 2
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert len(file_system.get_folder("test_folder").files) == 1
with pytest.raises(Exception):
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert len(file_system.get_folder("test_folder").files) == 1
file_system.show(full=True)
def test_deleting_a_non_existent_folder(file_system):
file_system.create_folder(folder_name="test_folder")
@@ -78,6 +114,8 @@ def test_deleting_a_non_existent_folder(file_system):
file_system.delete_folder(folder_name="does not exist!")
assert len(file_system.folders) == 2
file_system.show(full=True)
def test_deleting_root_folder_fails(file_system):
assert len(file_system.folders) == 1
@@ -85,6 +123,8 @@ def test_deleting_root_folder_fails(file_system):
file_system.delete_folder(folder_name="root")
assert len(file_system.folders) == 1
file_system.show(full=True)
def test_move_file(file_system):
"""Tests the file move function."""
@@ -103,6 +143,8 @@ def test_move_file(file_system):
assert len(file_system.get_folder("dst_folder").files) == 1
assert file_system.get_file("dst_folder", "test_file.txt").uuid == original_uuid
file_system.show(full=True)
def test_copy_file(file_system):
"""Tests the file copy function."""
@@ -121,192 +163,26 @@ def test_copy_file(file_system):
assert len(file_system.get_folder("dst_folder").files) == 1
assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid
@pytest.mark.skip(reason="Implementation for quarantine not needed yet")
def test_folder_quarantine_state(file_system):
"""Tests the changing of folder quarantine status."""
folder = file_system.get_folder("root")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.unquarantine()
assert folder.quarantine_status() is False
file_system.show(full=True)
def test_file_corrupt_repair(file_system):
"""Test the ability to corrupt and repair files."""
def test_get_file(file_system):
"""Test that files can be retrieved."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file1: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file2: File = file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file.corrupt()
folder.remove_file(file2)
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file_system.get_file_by_id(file_uuid=file1.uuid, folder_uuid=folder.uuid) is not None
assert file_system.get_file_by_id(file_uuid=file2.uuid, folder_uuid=folder.uuid) is None
assert file_system.get_file_by_id(file_uuid=file2.uuid, folder_uuid=folder.uuid, include_deleted=True) is not None
assert file_system.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None
file.repair()
file_system.delete_folder(folder_name="test_folder")
assert file_system.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_corrupt_repair(file_system):
"""Test the ability to corrupt and repair folders."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
folder.corrupt()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
folder.repair()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.scan()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.scan()
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_reveal_to_red_scan(file_system):
"""Test the ability to reveal files to red."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.reveal_to_red()
folder.apply_timestep(timestep=0)
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.apply_timestep(timestep=1)
assert folder.revealed_to_red is True
assert file1.revealed_to_red is True
assert file2.revealed_to_red is True
def test_simulated_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.check_hash() is True
# change simulated file size
file.sim_size = 0
assert file.check_hash() is False
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
assert file.check_hash() is True
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
assert file.check_hash() is False
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_simulated_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert folder.check_hash() is True
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
file.sim_size = 0
assert folder.check_hash() is False
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
assert folder.check_hash() is True
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
assert folder.check_hash() is False
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
file_system.show(full=True)
@pytest.mark.skip(reason="Skipping until we tackle serialisation")
@@ -318,3 +194,5 @@ def test_serialisation(file_system):
deserialised_file_sys = FileSystem.model_validate_json(serialised_file_sys)
assert file_system.model_dump_json() == deserialised_file_sys.model_dump_json()
file_system.show(full=True)

View File

@@ -2,144 +2,29 @@ from typing import Tuple
import pytest
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.folder import Folder
@pytest.fixture(scope="function")
def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]:
"""Test that an agent can request a file scan."""
"""Create a file system with a folder and file."""
folder = file_system.create_folder(folder_name="test_folder")
file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt")
return file_system, folder, file
def test_file_scan_request(populated_file_system):
"""Test that an agent can request a file scan."""
fs, folder, file = populated_file_system
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["file", file.uuid, "scan"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_scan_request(populated_file_system):
"""Test that an agent can request a folder scan."""
fs, folder, file = populated_file_system
fs.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["folder", folder.uuid, "scan"])
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_checkhash_request(populated_file_system):
"""Test that an agent can request a file hash check."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.uuid, "checkhash"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["file", file.uuid, "checkhash"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_checkhash_request(populated_file_system):
"""Test that an agent can request a folder hash check."""
fs, folder, file = populated_file_system
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_repair_request(populated_file_system):
"""Test that an agent can request a file repair."""
fs, folder, file = populated_file_system
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["file", file.uuid, "repair"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_repair_request(populated_file_system):
"""Test that an agent can request a folder repair."""
fs, folder, file = populated_file_system
folder.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["folder", folder.uuid, "repair"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.GOOD
def test_file_restore_request(populated_file_system):
pass
def test_folder_restore_request(populated_file_system):
pass
def test_file_corrupt_request(populated_file_system):
"""Test that an agent can request a file corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_corrupt_request(populated_file_system):
"""Test that an agent can request a folder corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_delete_request(populated_file_system):
"""Test that an agent can request a file deletion."""
fs, folder, file = populated_file_system
assert folder.get_file_by_id(file_uuid=file.uuid) is not None
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
fs.apply_request(request=["folder", folder.uuid, "delete", file.uuid])
assert folder.get_file_by_id(file_uuid=file.uuid) is None
fs.apply_request(request=["delete", "file", folder.uuid, file.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
fs.show(full=True)
def test_folder_delete_request(populated_file_system):
@@ -148,6 +33,8 @@ def test_folder_delete_request(populated_file_system):
assert folder.get_file_by_id(file_uuid=file.uuid) is not None
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is not None
fs.apply_request(request=["delete", folder.uuid])
fs.apply_request(request=["delete", "folder", folder.uuid])
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is None
assert folder.get_file_by_id(file_uuid=file.uuid) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is None
fs.show(full=True)

View File

@@ -0,0 +1,150 @@
import pytest
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
@pytest.mark.skip(reason="Implementation for quarantine not needed yet")
def test_folder_quarantine_state(file_system):
"""Tests the changing of folder quarantine status."""
folder = file_system.get_folder("root")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.unquarantine()
assert folder.quarantine_status() is False
def test_folder_get_file(file_system):
"""Test that files can be retrieved from the folder."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file1: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file2: File = file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
folder.remove_file(file2)
assert folder.get_file_by_id(file_uuid=file1.uuid) is not None
assert folder.get_file_by_id(file_uuid=file2.uuid) is None
assert folder.get_file_by_id(file_uuid=file2.uuid, include_deleted=True) is not None
def test_folder_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.scan()
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
folder.apply_timestep(timestep=2)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_reveal_to_red_scan(file_system):
"""Test the ability to reveal files to red."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.reveal_to_red()
folder.apply_timestep(timestep=0)
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.apply_timestep(timestep=1)
folder.apply_timestep(timestep=2)
assert folder.revealed_to_red is True
assert file1.revealed_to_red is True
assert file2.revealed_to_red is True
def test_folder_corrupt_repair(file_system):
"""Test the ability to corrupt and repair folders."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
folder.corrupt()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
folder.repair()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_simulated_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
folder.check_hash()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
file.sim_size = 0
folder.check_hash()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
folder.check_hash()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
folder.check_hash()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT

View File

@@ -0,0 +1,167 @@
from typing import Tuple
import pytest
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
@pytest.fixture(scope="function")
def populated_file_system(file_system) -> Tuple[FileSystem, Folder, File]:
"""Create a file system with a folder and file."""
folder = file_system.create_folder(folder_name="test_folder")
file = file_system.create_file(folder_name="test_folder", file_name="test_file.txt")
return file_system, folder, file
def test_folder_scan_request(populated_file_system):
"""Test that an agent can request a folder scan."""
fs, folder, file = populated_file_system
fs.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["folder", folder.uuid, "scan"])
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
folder.apply_timestep(timestep=2)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_checkhash_request(populated_file_system):
"""Test that an agent can request a folder hash check."""
fs, folder, file = populated_file_system
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_repair_request(populated_file_system):
"""Test that an agent can request a folder repair."""
fs, folder, file = populated_file_system
folder.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["folder", folder.uuid, "repair"])
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_restore_request(populated_file_system):
"""Test that an agent can request that a folder can be restored."""
fs, folder, file = populated_file_system
assert fs.get_folder_by_id(folder_uuid=folder.uuid) is not None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid) is not None
# delete folder
fs.apply_request(request=["delete", "folder", folder.uuid])
assert fs.get_folder(folder_name=folder.name) is None
assert fs.get_folder_by_id(folder_uuid=folder.uuid, include_deleted=True).deleted is True
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True
# restore folder
fs.apply_request(request=["restore", "folder", folder.uuid])
fs.apply_timestep(timestep=0)
assert fs.get_folder(folder_name=folder.name) is not None
assert (
fs.get_folder_by_id(folder_uuid=folder.uuid, include_deleted=True).health_status
== FileSystemItemHealthStatus.RESTORING
)
assert fs.get_folder_by_id(folder_uuid=folder.uuid, include_deleted=True).deleted is False
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is True
fs.apply_timestep(timestep=1)
fs.apply_timestep(timestep=2)
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert (
fs.get_file(folder_name=folder.name, file_name=file.name).health_status
is not FileSystemItemHealthStatus.RESTORING
)
assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is False
# corrupt folder
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
assert fs.get_folder(folder_name=folder.name).health_status == FileSystemItemHealthStatus.CORRUPT
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
# restore folder
fs.apply_request(request=["restore", "folder", folder.uuid])
fs.apply_timestep(timestep=0)
assert fs.get_folder(folder_name=folder.name).health_status == FileSystemItemHealthStatus.RESTORING
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_timestep(timestep=1)
fs.apply_timestep(timestep=2)
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert (
fs.get_file(folder_name=folder.name, file_name=file.name).health_status
is not FileSystemItemHealthStatus.RESTORING
)
assert fs.get_file(folder_name=folder.name, file_name=file.name).deleted is False
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
assert fs.get_file_by_id(folder_uuid=folder.uuid, file_uuid=file.uuid, include_deleted=True).deleted is False
def test_folder_corrupt_request(populated_file_system):
"""Test that an agent can request a folder corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_deleted_folder_and_its_files_cannot_be_interacted_with(populated_file_system):
"""Test that actions cannot affect deleted folder and its child files."""
fs, folder, file = populated_file_system
assert fs.get_file(folder_name=folder.name, file_name=file.name) is not None
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert fs.get_file(folder_name=folder.name, file_name=file.name).health_status == FileSystemItemHealthStatus.CORRUPT
fs.apply_request(request=["delete", "folder", folder.uuid])
assert fs.get_file(folder_name=folder.name, file_name=file.name) is None
fs.apply_request(request=["file", file.uuid, "repair"])
deleted_folder = fs.deleted_folders.get(folder.uuid)
deleted_file = deleted_folder.deleted_files.get(file.uuid)
assert deleted_file.health_status is not FileSystemItemHealthStatus.GOOD

View File

@@ -1,10 +1,9 @@
import pytest
from primaite.simulator.file_system.file_system import File, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
from primaite.simulator.network.hardware.base import Node, NodeOperatingState
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.processes.process import Process
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.software import SoftwareHealthState