#1962: separating file system into more managable files

This commit is contained in:
Czar.Echavez
2023-11-02 15:10:51 +00:00
parent b67eb1bb34
commit b2c3e273b7
9 changed files with 841 additions and 771 deletions

View File

@@ -0,0 +1,185 @@
from __future__ import annotations
import hashlib
import json
import os.path
from pathlib import Path
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension
_LOGGER = getLogger(__name__)
class File(FileSystemItemABC):
"""
Class representing a file in the simulation.
:ivar Folder folder: The folder in which the file resides.
:ivar FileType file_type: The type of the file.
:ivar Optional[int] sim_size: The simulated file size.
:ivar bool real: Indicates if the file is actually a real file in the Node sim fs output.
:ivar Optional[Path] sim_path: The path if the file is real.
"""
folder_id: str
"The id of the Folder the File is in."
folder_name: str
"The name of the Folder the file is in."
file_type: FileType
"The type of File."
sim_size: Optional[int] = None
"The simulated file size."
real: bool = False
"Indicates whether the File is actually a real file in the Node sim fs output."
sim_path: Optional[Path] = None
"The Path if real is True."
sim_root: Optional[Path] = None
"Root path of the simulation."
def __init__(self, **kwargs):
"""
Initialise File class.
:param name: The name of the file.
:param file_type: The FileType of the file
:param size: The size of the FileSystemItemABC
"""
has_extension = "." in kwargs["name"]
# Attempt to use the file type extension to set/override the FileType
if has_extension:
extension = kwargs["name"].split(".")[-1]
kwargs["file_type"] = get_file_type_from_extension(extension)
else:
# If the file name does not have a extension, override file type to FileType.UNKNOWN
if not kwargs["file_type"]:
kwargs["file_type"] = FileType.UNKNOWN
if kwargs["file_type"] != FileType.UNKNOWN:
kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}"
# set random file size if none provided
if not kwargs.get("sim_size"):
kwargs["sim_size"] = kwargs["file_type"].default_size
super().__init__(**kwargs)
if self.real:
self.sim_path = self.sim_root / self.path
if not self.sim_path.exists():
self.sim_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.sim_path, mode="a"):
pass
self.sys_log.info(f"Created file /{self.path} (id: {self.uuid})")
@property
def path(self) -> str:
"""
Get the path of the file in the file system.
:return: The full path of the file.
"""
return f"{self.folder_name}/{self.name}"
@property
def size(self) -> int:
"""
Get the size of the file in bytes.
:return: The size of the file in bytes.
"""
if self.real:
return os.path.getsize(self.sim_path)
return self.sim_size
def describe_state(self) -> Dict:
"""Produce a dictionary describing the current state of this object."""
state = super().describe_state()
state["size"] = self.size
state["file_type"] = self.file_type.name
return state
def scan(self) -> None:
"""Updates the visible statuses of the file."""
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
self.visible_health_status = self.health_status
def reveal_to_red(self):
"""Reveals the folder/file to the red agent."""
self.revealed_to_red = True
def check_hash(self) -> bool:
"""
Check if the file has been changed.
If changed, the file is considered corrupted.
Return False if corruption is detected, otherwise True
"""
current_hash = None
# if file is real, read the file contents
if self.real:
with open(self.sim_path, "rb") as f:
file_hash = hashlib.blake2b()
while chunk := f.read(8192):
file_hash.update(chunk)
current_hash = file_hash.hexdigest()
else:
# otherwise get describe_state dict and hash that
current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest()
# if the previous hash is None, set the current hash to previous
if self.previous_hash is None:
self.previous_hash = current_hash
# if the previous hash and current hash do not match, mark file as corrupted
if self.previous_hash is not current_hash:
self.corrupt()
return False
return True
def repair(self) -> bool:
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
super().repair()
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
return True
def restore(self) -> bool:
"""Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
super().restore()
restored = False
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
restored = True
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
return restored
def corrupt(self) -> bool:
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT."""
super().corrupt()
corrupted = False
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPT
corrupted = True
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
return corrupted

View File

@@ -1,8 +1,5 @@
from __future__ import annotations
import hashlib
import json
import os.path
import shutil
from pathlib import Path
from typing import Dict, Optional
@@ -11,8 +8,9 @@ from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.file_system.folder import Folder
from primaite.simulator.system.core.sys_log import SysLog
_LOGGER = getLogger(__name__)
@@ -27,7 +25,9 @@ class FileSystem(SimComponent):
"List containing all the folders that have been deleted."
_folders_by_name: Dict[str, Folder] = {}
sys_log: SysLog
"Instance of SysLog used to create system logs."
sim_root: Path
"Root path of the simulation."
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -86,42 +86,9 @@ class FileSystem(SimComponent):
else:
print(table.get_string(sortby="Folder"))
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()}
return state
def apply_timestep(self, timestep: int) -> None:
"""Apply time step to FileSystem and its child folders and files."""
super().apply_timestep(timestep=timestep)
# apply timestep to folders
for folder_id in self.folders:
self.folders[folder_id].apply_timestep(timestep=timestep)
def scan(self, instant_scan: bool = False):
"""
Scan all the folders (and child files) in the file system.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
for folder_id in self.folders:
self.folders[folder_id].scan(instant_scan=instant_scan)
def reveal_to_red(self, instant_scan: bool = False):
"""
Reveals all the folders (and child files) in the file system to the red agent.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
for folder_id in self.folders:
self.folders[folder_id].reveal_to_red(instant_scan=instant_scan)
###############################################################
# Folder methods
###############################################################
def create_folder(self, folder_name: str) -> Folder:
"""
Creates a Folder and adds it to the list of folders.
@@ -132,11 +99,10 @@ class FileSystem(SimComponent):
if self.get_folder(folder_name):
raise Exception(f"Cannot create folder as it already exists: {folder_name}")
folder = Folder(name=folder_name, fs=self)
folder = Folder(name=folder_name, sys_log=self.sys_log)
self.folders[folder.uuid] = folder
self._folders_by_name[folder.name] = folder
self.sys_log.info(f"Created folder /{folder.name}")
self._folder_request_manager.add_request(
name=folder.uuid, request_type=RequestType(func=folder._request_manager)
)
@@ -174,9 +140,27 @@ class FileSystem(SimComponent):
folder = self.get_folder_by_id(folder_uuid=folder_uuid)
self.delete_folder(folder_name=folder.name)
def restore_folder(self, folder_id: str):
"""TODO."""
pass
def get_folder(self, folder_name: str) -> Optional[Folder]:
"""
Get a folder by its name if it exists.
:param folder_name: The folder name.
:return: The matching Folder.
"""
return self._folders_by_name.get(folder_name)
def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]:
"""
Get a folder by its uuid if it exists.
:param: folder_uuid: The folder uuid.
:return: The matching Folder.
"""
return self.folders.get(folder_uuid)
###############################################################
# File methods
###############################################################
def create_file(
self,
@@ -210,12 +194,14 @@ class FileSystem(SimComponent):
name=file_name,
sim_size=size,
file_type=file_type,
folder=folder,
folder_id=folder.uuid,
folder_name=folder.name,
real=real,
sim_path=self.sim_root if real else None,
sim_root=self.sim_root,
sys_log=self.sys_log,
)
folder.add_file(file)
self.sys_log.info(f"Created file /{file.path}")
self._file_request_manager.add_request(name=file.uuid, request_type=RequestType(func=file._request_manager))
return file
@@ -266,7 +252,7 @@ class FileSystem(SimComponent):
dst_folder.add_file(file)
if file.real:
old_sim_path = file.sim_path
file.sim_path = file.folder.fs.sim_root / file.path
file.sim_path = file.sim_root / file.path
file.sim_path.parent.mkdir(exist_ok=True)
shutil.move(old_sim_path, file.sim_path)
@@ -280,14 +266,64 @@ class FileSystem(SimComponent):
"""
file = self.get_file(folder_name=src_folder_name, file_name=src_file_name)
if file:
# check that dest folder exists
dst_folder = self.get_folder(folder_name=dst_folder_name)
if not dst_folder:
# create dest folder
dst_folder = self.create_folder(dst_folder_name)
new_file = file.make_copy(dst_folder=dst_folder)
dst_folder.add_file(new_file)
file_copy = File(
folder_id=dst_folder.uuid,
folder_name=dst_folder.name,
**file.model_dump(exclude={"uuid", "folder_id", "folder_name", "sim_path"}),
)
dst_folder.add_file(file_copy)
if file.real:
new_file.sim_path.parent.mkdir(exist_ok=True)
shutil.copy2(file.sim_path, new_file.sim_path)
file_copy.sim_path.parent.mkdir(exist_ok=True)
shutil.copy2(file.sim_path, file_copy.sim_path)
else:
self.sys_log.error(f"Unable to copy file. {src_file_name} does not exist.")
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()}
return state
def apply_timestep(self, timestep: int) -> None:
"""Apply time step to FileSystem and its child folders and files."""
super().apply_timestep(timestep=timestep)
# apply timestep to folders
for folder_id in self.folders:
self.folders[folder_id].apply_timestep(timestep=timestep)
def scan(self, instant_scan: bool = False):
"""
Scan all the folders (and child files) in the file system.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
for folder_id in self.folders:
self.folders[folder_id].scan(instant_scan=instant_scan)
def reveal_to_red(self, instant_scan: bool = False):
"""
Reveals all the folders (and child files) in the file system to the red agent.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
for folder_id in self.folders:
self.folders[folder_id].reveal_to_red(instant_scan=instant_scan)
def restore_folder(self, folder_id: str):
"""TODO."""
pass
def restore_file(self, folder_id: str, file_id: str):
"""
@@ -302,522 +338,3 @@ class FileSystem(SimComponent):
:type: folder_id: str
"""
pass
def get_folder(self, folder_name: str) -> Optional[Folder]:
"""
Get a folder by its name if it exists.
:param folder_name: The folder name.
:return: The matching Folder.
"""
return self._folders_by_name.get(folder_name)
def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]:
"""
Get a folder by its uuid if it exists.
:param: folder_uuid: The folder uuid.
:return: The matching Folder.
"""
return self.folders.get(folder_uuid)
class Folder(FileSystemItemABC):
"""Simulation Folder."""
fs: FileSystem
"The FileSystem the Folder is in."
files: Dict[str, File] = {}
"Files stored in the folder."
_files_by_name: Dict[str, File] = {}
"Files by their name as <file name>.<file type>."
deleted_files: Dict[str, File] = {}
"Files that have been deleted."
scan_duration: int = -1
"How many timesteps to complete a scan."
red_scan_duration: int = -1
"How many timesteps to complete reveal to red scan."
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request(
name="delete",
request_type=RequestType(func=lambda request, context: self.remove_file_by_id(file_uuid=request[0])),
)
return rm
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()}
return state
def show(self, markdown: bool = False):
"""
Display the contents of the Folder in tabular format.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["File", "Size"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.fs.sys_log.hostname} File System Folder ({self.name})"
for file in self.files.values():
table.add_row([file.name, file.size_str])
print(table.get_string(sortby="File"))
@property
def size(self) -> int:
"""
Calculate and return the total size of all files in the folder.
:return: The total size of all files in the folder. If no files exist or all have `None`
size, returns 0.
"""
return sum(file.size for file in self.files.values() if file.size is not None)
def apply_timestep(self, timestep: int):
"""
Apply a single timestep of simulation dynamics to this folder and its files.
In this instance, if any multi-timestep processes are currently occurring (such as scanning),
then they are brought one step closer to being finished.
:param timestep: The current timestep number. (Amount of time since simulation episode began)
:type timestep: int
"""
super().apply_timestep(timestep=timestep)
# scan files each timestep
if self.scan_duration >= 0:
self.scan_duration -= 1
if self.scan_duration == 0:
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
# red scan file at each step
if self.red_scan_duration >= 0:
self.red_scan_duration -= 1
if self.red_scan_duration == 0:
self.revealed_to_red = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.reveal_to_red()
# apply timestep to files in folder
for file_id in self.files:
self.files[file_id].apply_timestep(timestep=timestep)
def get_file(self, file_name: str) -> Optional[File]:
"""
Get a file by its name.
File name must be the filename and prefix, like 'memo.docx'.
:param file_name: The file name.
:return: The matching File.
"""
# TODO: Increment read count?
return self._files_by_name.get(file_name)
def get_file_by_id(self, file_uuid: str) -> File:
"""
Get a file by its uuid.
:param: file_uuid: The file uuid.
:return: The matching File.
"""
return self.files.get(file_uuid)
def add_file(self, file: File):
"""
Adds a file to the folder.
:param File file: The File object to be added to the folder.
:raises Exception: If the provided `file` parameter is None or not an instance of the
`File` class.
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
# check if file with id already exists in folder
if file.uuid in self.files:
_LOGGER.debug(f"File with id {file.uuid} already exists in folder")
else:
# add to list
self.files[file.uuid] = file
self._files_by_name[file.name] = file
file.folder = self
def remove_file(self, file: Optional[File]):
"""
Removes a file from the folder list.
The method can take a File object or a file id.
:param file: The file to remove
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
if self.files.get(file.uuid):
self.files.pop(file.uuid)
self._files_by_name.pop(file.name)
self.deleted_files[file.uuid] = file
self.fs.sys_log.info(f"Removed file {file.name} (id: {file.uuid})")
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
def remove_file_by_id(self, file_uuid: str):
"""
Remove a file using id.
:param: file_uuid: The UUID of the file to remove.
"""
file = self.get_file_by_id(file_uuid=file_uuid)
self.remove_file(file=file)
def remove_all_files(self):
"""Removes all the files in the folder."""
for file_id in self.files:
self.deleted_files[file_id] = self.files[file_id]
self.files = {}
self._files_by_name = {}
def restore_file(self, file: Optional[File]):
"""
Restores a file.
The method can take a File object or a file id.
:param file: The file to restore
"""
pass
def quarantine(self):
"""Quarantines the File System Folder."""
pass
def unquarantine(self):
"""Unquarantine of the File System Folder."""
pass
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
pass
def scan(self, instant_scan: bool = False) -> None:
"""
Update Folder visible status.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
if instant_scan:
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
return
if self.scan_duration <= 0:
# scan one file per timestep
self.scan_duration = len(self.files)
self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.fs.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})")
def reveal_to_red(self, instant_scan: bool = False):
"""
Reveals the folders and files to the red agent.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
if instant_scan:
self.revealed_to_red = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.reveal_to_red()
return
if self.red_scan_duration <= 0:
# scan one file per timestep
self.red_scan_duration = len(self.files)
self.fs.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.fs.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})")
def check_hash(self) -> bool:
"""
Runs a :func:`check_hash` on all files in the folder.
If a file under the folder is corrupted, the whole folder is considered corrupted.
TODO: For now this will just iterate through the files and run :func:`check_hash` and ignores
any other changes to the folder
Return False if corruption is detected, otherwise True
"""
super().check_hash()
# iterate through the files and run a check hash
no_corrupted_files = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
no_corrupted_files = file.check_hash()
# if one file in the folder is corrupted, set the folder status to corrupted
if not no_corrupted_files:
self.corrupt()
self.fs.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})")
return no_corrupted_files
def repair(self) -> bool:
"""Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
super().repair()
repaired = False
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
repaired = file.repair()
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
repaired = True
self.fs.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})")
return repaired
def restore(self) -> bool:
"""Restore a File by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
super().restore()
restored = False
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
restored = file.restore()
# set file status to corrupt if good
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
restored = True
self.fs.sys_log.info(f"Restored folder {self.name} (id: {self.uuid})")
return restored
def corrupt(self) -> bool:
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT."""
super().corrupt()
corrupted = False
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
corrupted = file.corrupt()
# set file status to corrupt if good
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPT
corrupted = True
self.fs.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})")
return corrupted
class File(FileSystemItemABC):
"""
Class representing a file in the simulation.
:ivar Folder folder: The folder in which the file resides.
:ivar FileType file_type: The type of the file.
:ivar Optional[int] sim_size: The simulated file size.
:ivar bool real: Indicates if the file is actually a real file in the Node sim fs output.
:ivar Optional[Path] sim_path: The path if the file is real.
"""
folder: Folder
"The Folder the File is in."
file_type: FileType
"The type of File."
sim_size: Optional[int] = None
"The simulated file size."
real: bool = False
"Indicates whether the File is actually a real file in the Node sim fs output."
sim_path: Optional[Path] = None
"The Path if real is True."
def __init__(self, **kwargs):
"""
Initialise File class.
:param name: The name of the file.
:param file_type: The FileType of the file
:param size: The size of the FileSystemItemABC
"""
has_extension = "." in kwargs["name"]
# Attempt to use the file type extension to set/override the FileType
if has_extension:
extension = kwargs["name"].split(".")[-1]
kwargs["file_type"] = get_file_type_from_extension(extension)
else:
# If the file name does not have a extension, override file type to FileType.UNKNOWN
if not kwargs["file_type"]:
kwargs["file_type"] = FileType.UNKNOWN
if kwargs["file_type"] != FileType.UNKNOWN:
kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}"
# set random file size if none provided
if not kwargs.get("sim_size"):
kwargs["sim_size"] = kwargs["file_type"].default_size
super().__init__(**kwargs)
if self.real:
self.sim_path = self.folder.fs.sim_root / self.path
if not self.sim_path.exists():
self.sim_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.sim_path, mode="a"):
pass
def make_copy(self, dst_folder: Folder) -> File:
"""
Create a copy of the current File object in the given destination folder.
:param Folder dst_folder: The destination folder for the copied file.
:return: A new File object that is a copy of the current file.
"""
return File(folder=dst_folder, **self.model_dump(exclude={"uuid", "folder", "sim_path"}))
@property
def path(self) -> str:
"""
Get the path of the file in the file system.
:return: The full path of the file.
"""
return f"{self.folder.name}/{self.name}"
@property
def size(self) -> int:
"""
Get the size of the file in bytes.
:return: The size of the file in bytes.
"""
if self.real:
return os.path.getsize(self.sim_path)
return self.sim_size
def describe_state(self) -> Dict:
"""Produce a dictionary describing the current state of this object."""
state = super().describe_state()
state["size"] = self.size
state["file_type"] = self.file_type.name
return state
def scan(self) -> None:
"""Updates the visible statuses of the file."""
path = self.folder.name + "/" + self.name
self.folder.fs.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
self.visible_health_status = self.health_status
def reveal_to_red(self):
"""Reveals the folder/file to the red agent."""
self.revealed_to_red = True
def check_hash(self) -> bool:
"""
Check if the file has been changed.
If changed, the file is considered corrupted.
Return False if corruption is detected, otherwise True
"""
current_hash = None
# if file is real, read the file contents
if self.real:
with open(self.sim_path, "rb") as f:
file_hash = hashlib.blake2b()
while chunk := f.read(8192):
file_hash.update(chunk)
current_hash = file_hash.hexdigest()
else:
# otherwise get describe_state dict and hash that
current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest()
# if the previous hash is None, set the current hash to previous
if self.previous_hash is None:
self.previous_hash = current_hash
# if the previous hash and current hash do not match, mark file as corrupted
if self.previous_hash is not current_hash:
self.corrupt()
return False
return True
def repair(self) -> bool:
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
super().repair()
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
return True
def restore(self) -> bool:
"""Restore a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
super().restore()
restored = False
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
restored = True
path = self.folder.name + "/" + self.name
self.folder.fs.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
return restored
def corrupt(self) -> bool:
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT."""
super().corrupt()
corrupted = False
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPT
corrupted = True
path = self.folder.name + "/" + self.name
self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
return corrupted

View File

@@ -7,6 +7,7 @@ from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.system.core.sys_log import SysLog
_LOGGER = getLogger(__name__)
@@ -78,6 +79,9 @@ class FileSystemItemABC(SimComponent):
revealed_to_red: bool = False
"If true, the folder/file has been revealed to the red agent."
sys_log: SysLog
"Used for creating system logs."
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.

View File

@@ -0,0 +1,347 @@
from __future__ import annotations
from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
_LOGGER = getLogger(__name__)
class Folder(FileSystemItemABC):
"""Simulation Folder."""
files: Dict[str, File] = {}
"Files stored in the folder."
_files_by_name: Dict[str, File] = {}
"Files by their name as <file name>.<file type>."
deleted_files: Dict[str, File] = {}
"Files that have been deleted."
scan_duration: int = -1
"How many timesteps to complete a scan."
red_scan_duration: int = -1
"How many timesteps to complete reveal to red scan."
def __init__(self, **kwargs):
"""
Initialise Folder class.
:param name: The name of the folder.
:param sys_log: The SysLog instance to us to create system logs.
"""
super().__init__(**kwargs)
self.sys_log.info(f"Created file /{self.name} (id: {self.uuid})")
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request(
name="delete",
request_type=RequestType(func=lambda request, context: self.remove_file_by_id(file_uuid=request[0])),
)
return rm
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()}
return state
def show(self, markdown: bool = False):
"""
Display the contents of the Folder in tabular format.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["File", "Size"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} File System Folder ({self.name})"
for file in self.files.values():
table.add_row([file.name, file.size_str])
print(table.get_string(sortby="File"))
@property
def size(self) -> int:
"""
Calculate and return the total size of all files in the folder.
:return: The total size of all files in the folder. If no files exist or all have `None`
size, returns 0.
"""
return sum(file.size for file in self.files.values() if file.size is not None)
def apply_timestep(self, timestep: int):
"""
Apply a single timestep of simulation dynamics to this folder and its files.
In this instance, if any multi-timestep processes are currently occurring (such as scanning),
then they are brought one step closer to being finished.
:param timestep: The current timestep number. (Amount of time since simulation episode began)
:type timestep: int
"""
super().apply_timestep(timestep=timestep)
# scan files each timestep
if self.scan_duration >= 0:
self.scan_duration -= 1
if self.scan_duration == 0:
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
# red scan file at each step
if self.red_scan_duration >= 0:
self.red_scan_duration -= 1
if self.red_scan_duration == 0:
self.revealed_to_red = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.reveal_to_red()
# apply timestep to files in folder
for file_id in self.files:
self.files[file_id].apply_timestep(timestep=timestep)
def get_file(self, file_name: str) -> Optional[File]:
"""
Get a file by its name.
File name must be the filename and prefix, like 'memo.docx'.
:param file_name: The file name.
:return: The matching File.
"""
# TODO: Increment read count?
return self._files_by_name.get(file_name)
def get_file_by_id(self, file_uuid: str) -> File:
"""
Get a file by its uuid.
:param: file_uuid: The file uuid.
:return: The matching File.
"""
return self.files.get(file_uuid)
def add_file(self, file: File):
"""
Adds a file to the folder.
:param File file: The File object to be added to the folder.
:raises Exception: If the provided `file` parameter is None or not an instance of the
`File` class.
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
# check if file with id already exists in folder
if file.uuid in self.files:
_LOGGER.debug(f"File with id {file.uuid} already exists in folder")
else:
# add to list
self.files[file.uuid] = file
self._files_by_name[file.name] = file
file.folder = self
def remove_file(self, file: Optional[File]):
"""
Removes a file from the folder list.
The method can take a File object or a file id.
:param file: The file to remove
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
if self.files.get(file.uuid):
self.files.pop(file.uuid)
self._files_by_name.pop(file.name)
self.deleted_files[file.uuid] = file
self.sys_log.info(f"Removed file {file.name} (id: {file.uuid})")
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
def remove_file_by_id(self, file_uuid: str):
"""
Remove a file using id.
:param: file_uuid: The UUID of the file to remove.
"""
file = self.get_file_by_id(file_uuid=file_uuid)
self.remove_file(file=file)
def remove_all_files(self):
"""Removes all the files in the folder."""
for file_id in self.files:
self.deleted_files[file_id] = self.files[file_id]
self.files = {}
self._files_by_name = {}
def restore_file(self, file: Optional[File]):
"""
Restores a file.
The method can take a File object or a file id.
:param file: The file to restore
"""
pass
def quarantine(self):
"""Quarantines the File System Folder."""
pass
def unquarantine(self):
"""Unquarantine of the File System Folder."""
pass
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
pass
def scan(self, instant_scan: bool = False) -> None:
"""
Update Folder visible status.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
if instant_scan:
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
return
if self.scan_duration <= 0:
# scan one file per timestep
self.scan_duration = len(self.files)
self.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})")
def reveal_to_red(self, instant_scan: bool = False):
"""
Reveals the folders and files to the red agent.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
if instant_scan:
self.revealed_to_red = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.reveal_to_red()
return
if self.red_scan_duration <= 0:
# scan one file per timestep
self.red_scan_duration = len(self.files)
self.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})")
def check_hash(self) -> bool:
"""
Runs a :func:`check_hash` on all files in the folder.
If a file under the folder is corrupted, the whole folder is considered corrupted.
TODO: For now this will just iterate through the files and run :func:`check_hash` and ignores
any other changes to the folder
Return False if corruption is detected, otherwise True
"""
super().check_hash()
# iterate through the files and run a check hash
no_corrupted_files = True
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
no_corrupted_files = file.check_hash()
# if one file in the folder is corrupted, set the folder status to corrupted
if not no_corrupted_files:
self.corrupt()
self.sys_log.info(f"Checking hash of folder {self.name} (id: {self.uuid})")
return no_corrupted_files
def repair(self) -> bool:
"""Repair a corrupted Folder by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
super().repair()
repaired = False
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
repaired = file.repair()
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
repaired = True
self.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})")
return repaired
def restore(self) -> bool:
"""Restore a File by setting the folder and containing files status to FileSystemItemStatus.GOOD."""
super().restore()
restored = False
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
restored = file.restore()
# set file status to corrupt if good
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
restored = True
self.sys_log.info(f"Restored folder {self.name} (id: {self.uuid})")
return restored
def corrupt(self) -> bool:
"""Corrupt a File by setting the folder and containing files status to FileSystemItemStatus.CORRUPT."""
super().corrupt()
corrupted = False
# iterate through the files in the folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
corrupted = file.corrupt()
# set file status to corrupt if good
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPT
corrupted = True
self.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})")
return corrupted

View File

@@ -0,0 +1,78 @@
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.file_type import FileType
def test_create_file_no_extension(file_system):
"""Tests that creating a file without an extension sets the file type to FileType.UNKNOWN."""
file = file_system.create_file(file_name="test_file")
assert len(file_system.folders) is 1
assert file_system.get_folder("root").get_file("test_file") == file
assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN
assert file_system.get_folder("root").get_file("test_file").size == 0
def test_file_scan(file_system):
"""Test the ability to update visible status."""
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.scan()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_reveal_to_red_scan(file_system):
"""Test the ability to reveal files to red."""
file = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.revealed_to_red is False
file.reveal_to_red()
assert file.revealed_to_red is True
def test_simulated_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.check_hash() is True
# change simulated file size
file.sim_size = 0
assert file.check_hash() is False
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", real=True)
assert file.check_hash() is True
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
assert file.check_hash() is False
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_file_corrupt_repair(file_system):
"""Test the ability to corrupt and repair files."""
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
file.repair()
assert file.health_status == FileSystemItemHealthStatus.GOOD

View File

@@ -1,6 +1,6 @@
import pytest
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.file_type import FileType
@@ -26,15 +26,6 @@ def test_create_file_no_folder(file_system):
assert file_system.get_folder("root").get_file("test_file.txt").size == 10
def test_create_file_no_extension(file_system):
"""Tests that creating a file without an extension sets the file type to FileType.UNKNOWN."""
file = file_system.create_file(file_name="test_file")
assert len(file_system.folders) is 1
assert file_system.get_folder("root").get_file("test_file") == file
assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN
assert file_system.get_folder("root").get_file("test_file").size == 0
def test_delete_file(file_system):
"""Tests that a file can be deleted."""
file_system.create_file(file_name="test_file.txt")
@@ -125,193 +116,6 @@ def test_copy_file(file_system):
assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid
@pytest.mark.skip(reason="Implementation for quarantine not needed yet")
def test_folder_quarantine_state(file_system):
"""Tests the changing of folder quarantine status."""
folder = file_system.get_folder("root")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.unquarantine()
assert folder.quarantine_status() is False
def test_file_corrupt_repair(file_system):
"""Test the ability to corrupt and repair files."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
file.repair()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_corrupt_repair(file_system):
"""Test the ability to corrupt and repair folders."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
folder.corrupt()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
folder.repair()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.scan()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.scan()
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_reveal_to_red_scan(file_system):
"""Test the ability to reveal files to red."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.reveal_to_red()
folder.apply_timestep(timestep=0)
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.apply_timestep(timestep=1)
assert folder.revealed_to_red is True
assert file1.revealed_to_red is True
assert file2.revealed_to_red is True
def test_simulated_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.check_hash() is True
# change simulated file size
file.sim_size = 0
assert file.check_hash() is False
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_file_check_hash(file_system):
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
assert file.check_hash() is True
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
assert file.check_hash() is False
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
def test_simulated_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert folder.check_hash() is True
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
file.sim_size = 0
assert folder.check_hash() is False
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
assert folder.check_hash() is True
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
assert folder.check_hash() is False
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
@pytest.mark.skip(reason="Skipping until we tackle serialisation")
def test_serialisation(file_system):
"""Test to check that the object serialisation works correctly."""

View File

@@ -2,7 +2,8 @@ from typing import Tuple
import pytest
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file_system import File, FileSystem, Folder
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
@pytest.fixture(scope="function")

View File

@@ -0,0 +1,133 @@
import pytest
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
@pytest.mark.skip(reason="Implementation for quarantine not needed yet")
def test_folder_quarantine_state(file_system):
"""Tests the changing of folder quarantine status."""
folder = file_system.get_folder("root")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.unquarantine()
assert folder.quarantine_status() is False
def test_folder_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.scan()
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_reveal_to_red_scan(file_system):
"""Test the ability to reveal files to red."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.reveal_to_red()
folder.apply_timestep(timestep=0)
assert folder.revealed_to_red is False
assert file1.revealed_to_red is False
assert file2.revealed_to_red is False
folder.apply_timestep(timestep=1)
assert folder.revealed_to_red is True
assert file1.revealed_to_red is True
assert file2.revealed_to_red is True
def test_folder_corrupt_repair(file_system):
"""Test the ability to corrupt and repair folders."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
folder.corrupt()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
folder.repair()
file = folder.get_file(file_name="test_file.txt")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_simulated_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert folder.check_hash() is True
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
file.sim_size = 0
assert folder.check_hash() is False
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
def test_real_folder_check_hash(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True)
assert folder.check_hash() is True
# change simulated file size
file = folder.get_file(file_name="test_file.txt")
# change file content
with open(file.sim_path, "a") as f:
f.write("get hacked scrub lol xD\n")
assert folder.check_hash() is False
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT

View File

@@ -1,6 +1,7 @@
import pytest
from primaite.simulator.file_system.file_system import File, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file_system import File, Folder
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.base import Node, NodeOperatingState
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.processes.process import Process