Files
PrimAITE/src/primaite/simulator/file_system/file_system.py

567 lines
21 KiB
Python
Raw Normal View History

from __future__ import annotations
from pathlib import Path
2024-05-20 13:10:21 +01:00
from typing import Any, Dict, List, Optional
2023-08-03 12:14:11 +01:00
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
2023-11-08 10:48:41 +00:00
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.file_system.folder import Folder
from primaite.simulator.system.core.sys_log import SysLog
2023-08-01 16:18:49 +01:00
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
folders: Dict[str, Folder] = {}
"List containing all the folders in the file system."
2023-10-31 15:52:44 +00:00
deleted_folders: Dict[str, Folder] = {}
"List containing all the folders that have been deleted."
sys_log: SysLog
"Instance of SysLog used to create system logs."
sim_root: Path
"Root path of the simulation."
num_file_creations: int = 0
"Number of file creations in the current step."
num_file_deletions: int = 0
"Number of file deletions in the current step."
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Ensure a default root folder
if not self.folders:
self.create_folder("root")
2023-10-09 13:24:08 +01:00
def _init_request_manager(self) -> RequestManager:
2024-03-11 10:20:47 +00:00
"""
Initialise the request manager.
More information in user guide and docstring for SimComponent._init_request_manager.
"""
rm = super()._init_request_manager()
2023-09-19 11:24:42 +01:00
self._delete_manager = RequestManager()
self._delete_manager.add_request(
name="file",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.delete_file(folder_name=request[0], file_name=request[1])
)
),
)
self._delete_manager.add_request(
name="folder",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0]))
),
)
rm.add_request(
name="delete",
request_type=RequestType(func=self._delete_manager),
)
self._create_manager = RequestManager()
2024-05-20 13:10:21 +01:00
def _create_file_action(request: List[Any], context: Any) -> RequestResponse:
file = self.create_file(folder_name=request[0], file_name=request[1])
if not file:
return RequestResponse.from_bool(False)
return RequestResponse(
status="success",
data={
"file_name": file.name,
"folder_name": file.folder_name,
"file_type": file.file_type,
"file_size": file.size,
},
)
self._create_manager.add_request(
name="file",
2024-05-20 13:10:21 +01:00
request_type=RequestType(func=_create_file_action),
)
2024-05-20 13:10:21 +01:00
def _create_folder_action(request: List[Any], context: Any) -> RequestResponse:
folder = self.create_folder(folder_name=request[0])
if not folder:
return RequestResponse.from_bool(False)
return RequestResponse(status="success", data={"folder_name": folder.name})
self._create_manager.add_request(
name="folder",
2024-05-20 13:10:21 +01:00
request_type=RequestType(func=_create_folder_action),
)
rm.add_request(
name="create",
request_type=RequestType(func=self._create_manager),
)
rm.add_request(
name="access",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.access_file(folder_name=request[0], file_name=request[1])
)
),
)
2023-11-06 10:22:08 +00:00
self._restore_manager = RequestManager()
self._restore_manager.add_request(
name="file",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.restore_file(folder_name=request[0], file_name=request[1])
)
2023-11-06 10:22:08 +00:00
),
)
self._restore_manager.add_request(
name="folder",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(self.restore_folder(folder_name=request[0]))
),
2023-11-06 10:22:08 +00:00
)
rm.add_request(
name="restore",
request_type=RequestType(func=self._restore_manager),
)
self._folder_request_manager = RequestManager()
rm.add_request("folder", RequestType(func=self._folder_request_manager))
2023-09-19 11:24:42 +01:00
self._file_request_manager = RequestManager()
rm.add_request("file", RequestType(func=self._file_request_manager))
2023-09-19 11:24:42 +01:00
return rm
2023-09-19 11:24:42 +01:00
@property
def size(self) -> int:
"""
Calculate and return the total size of all folders in the file system.
:return: The sum of the sizes of all folders in the file system.
"""
return sum(folder.size for folder in self.folders.values())
def show(self, markdown: bool = False, full: bool = False):
"""
Prints a table of the FileSystem, displaying either just folders or full files.
:param markdown: Flag indicating if output should be in markdown format.
:param full: Flag indicating if to show full files.
"""
headers = ["Folder", "Size", "Deleted"]
if full:
headers[0] = "File Path"
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} File System"
folders = {**self.folders, **self.deleted_folders}
for folder in folders.values():
if not full:
table.add_row([folder.name, folder.size_str, folder.deleted])
else:
files = {**folder.files, **folder.deleted_files}
if not files:
table.add_row([folder.name, folder.size_str, folder.deleted])
else:
for file in files.values():
table.add_row([file.path, file.size_str, file.deleted])
if full:
print(table.get_string(sortby="File Path"))
else:
print(table.get_string(sortby="Folder"))
2023-08-03 12:14:11 +01:00
###############################################################
# Folder methods
###############################################################
def create_folder(self, folder_name: str) -> Folder:
"""
Creates a Folder and adds it to the list of folders.
:param folder_name: The name of the folder.
"""
# check if folder with name already exists
if self.get_folder(folder_name):
raise Exception(f"Cannot create folder as it already exists: {folder_name}")
folder = Folder(name=folder_name, sys_log=self.sys_log)
self.folders[folder.uuid] = folder
self._folder_request_manager.add_request(
name=folder.name, request_type=RequestType(func=folder._request_manager)
)
2023-08-03 12:14:11 +01:00
return folder
2023-08-01 16:18:49 +01:00
def delete_folder(self, folder_name: str) -> bool:
2023-08-01 16:18:49 +01:00
"""
Deletes a folder, removes it from the folders list and removes any child folders and files.
2023-08-01 16:18:49 +01:00
:param folder_name: The name of the folder.
2023-08-01 16:18:49 +01:00
"""
if folder_name == "root":
self.sys_log.error("Cannot delete the root folder.")
return False
folder = self.get_folder(folder_name)
if not folder:
self.sys_log.error(f"Cannot delete folder as it does not exist: {folder_name}")
return False
# set folder to deleted state
folder.delete()
2023-10-31 15:52:44 +00:00
# remove from folder list
self.folders.pop(folder.uuid)
# add to deleted list
folder.remove_all_files()
2023-08-01 16:18:49 +01:00
self.deleted_folders[folder.uuid] = folder
self.sys_log.warning(f"Deleted folder /{folder.name} and its contents")
return True
2023-08-01 16:18:49 +01:00
def delete_folder_by_id(self, folder_uuid: str) -> None:
"""
Deletes a folder via its uuid.
:param: folder_uuid: UUID of the folder to delete
"""
folder = self.get_folder_by_id(folder_uuid=folder_uuid)
self.delete_folder(folder_name=folder.name)
def get_folder(self, folder_name: str, include_deleted: bool = False) -> Optional[Folder]:
"""
Get a folder by its name if it exists.
:param folder_name: The folder name.
:return: The matching Folder.
"""
for folder in self.folders.values():
if folder.name == folder_name:
return folder
if include_deleted:
for folder in self.deleted_folders.values():
if folder.name == folder_name:
return folder
return None
def get_folder_by_id(self, folder_uuid: str, include_deleted: Optional[bool] = False) -> Optional[Folder]:
"""
Get a folder by its uuid if it exists.
:param: folder_uuid: The folder uuid.
2023-11-06 10:22:08 +00:00
:param: include_deleted: If true, the deleted folders will also be checked
:return: The matching Folder.
"""
2023-11-06 10:22:08 +00:00
if include_deleted:
folder = self.deleted_folders.get(folder_uuid)
if folder:
return folder
return self.folders.get(folder_uuid)
###############################################################
# File methods
###############################################################
2023-10-16 11:42:56 +01:00
def create_file(
self,
file_name: str,
size: Optional[int] = None,
file_type: Optional[FileType] = None,
folder_name: Optional[str] = None,
) -> File:
2023-08-01 16:18:49 +01:00
"""
Creates a File and adds it to the list of files.
2023-08-01 16:18:49 +01:00
:param file_name: The file name.
:param size: The size the file takes on disk in bytes.
:param file_type: The type of the file.
:param folder_name: The folder to add the file to.
"""
if folder_name:
# check if file with name already exists
folder = self.get_folder(folder_name)
# If not then create it
if not folder:
folder = self.create_folder(folder_name)
else:
# Use root folder if folder_name not supplied
folder = self.get_folder("root")
# Create the file and add it to the folder
file = File(
name=file_name,
sim_size=size,
file_type=file_type,
folder_id=folder.uuid,
folder_name=folder.name,
sim_root=self.sim_root,
sys_log=self.sys_log,
)
folder.add_file(file)
self._file_request_manager.add_request(name=file.name, request_type=RequestType(func=file._request_manager))
# increment file creation
self.num_file_creations += 1
return file
def get_file(self, folder_name: str, file_name: str, include_deleted: Optional[bool] = False) -> Optional[File]:
"""
Retrieve a file by its name from a specific folder.
:param folder_name: The name of the folder where the file resides.
:param file_name: The name of the file to be retrieved, including its extension.
:return: An instance of File if it exists, otherwise `None`.
"""
folder = self.get_folder(folder_name, include_deleted=include_deleted)
if folder:
return folder.get_file(file_name, include_deleted=include_deleted)
self.sys_log.warning(f"File not found /{folder_name}/{file_name}")
def get_file_by_id(
self, file_uuid: str, folder_uuid: Optional[str] = None, include_deleted: Optional[bool] = False
) -> Optional[File]:
"""
Retrieve a file by its uuid from a specific folder.
:param: file_uuid: The uuid of the folder where the file resides.
:param: folder_uuid: The uuid of the file to be retrieved, including its extension.
:param: include_deleted: If true, the deleted files will also be checked
:return: An instance of File if it exists, otherwise `None`.
"""
2023-11-06 10:22:08 +00:00
folder = self.get_folder_by_id(folder_uuid=folder_uuid, include_deleted=include_deleted)
if folder:
return folder.get_file_by_id(file_uuid=file_uuid, include_deleted=include_deleted)
# iterate through every folder looking for file
file = None
for folder_id in self.folders:
folder = self.folders.get(folder_id)
res = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
if res:
file = res
if include_deleted:
for folder_id in self.deleted_folders:
folder = self.deleted_folders.get(folder_id)
res = folder.get_file_by_id(file_uuid=file_uuid, include_deleted=True)
if res:
file = res
return file
def delete_file(self, folder_name: str, file_name: str) -> bool:
"""
Delete a file by its name from a specific folder.
:param folder_name: The name of the folder containing the file.
:param file_name: The name of the file to be deleted, including its extension.
"""
folder = self.get_folder(folder_name)
if folder:
file = folder.get_file(file_name)
if file:
# increment file creation
self.num_file_deletions += 1
folder.remove_file(file)
return True
return False
def delete_file_by_id(self, folder_uuid: str, file_uuid: str) -> None:
"""
Deletes a file via its uuid.
:param: folder_uuid: UUID of the folder the file belongs to
:param: file_uuid: UUID of the file to delete
"""
folder = self.get_folder_by_id(folder_uuid=folder_uuid)
if folder:
file = folder.get_file_by_id(file_uuid=file_uuid)
2023-11-06 10:22:08 +00:00
if file:
self.delete_file(folder_name=folder.name, file_name=file.name)
else:
self.sys_log.error(f"Unable to delete file that does not exist. (id: {file_uuid})")
def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str) -> None:
"""
Move a file from one folder to another.
:param src_folder_name: The name of the source folder containing the file.
:param src_file_name: The name of the file to be moved.
:param dst_folder_name: The name of the destination folder.
"""
file = self.get_file(folder_name=src_folder_name, file_name=src_file_name)
if file:
# remove file from src
self.delete_file(folder_name=file.folder_name, file_name=file.name)
dst_folder = self.get_folder(folder_name=dst_folder_name)
if not dst_folder:
dst_folder = self.create_folder(dst_folder_name)
# add file to dst
dst_folder.add_file(file)
self.num_file_creations += 1
def copy_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str):
"""
Copy a file from one folder to another.
2023-08-01 16:18:49 +01:00
:param src_folder_name: The name of the source folder containing the file.
:param src_file_name: The name of the file to be copied.
:param dst_folder_name: The name of the destination folder.
"""
file = self.get_file(folder_name=src_folder_name, file_name=src_file_name)
if file:
# check that dest folder exists
dst_folder = self.get_folder(folder_name=dst_folder_name)
if not dst_folder:
# create dest folder
dst_folder = self.create_folder(dst_folder_name)
2023-10-16 11:42:56 +01:00
file_copy = File(
folder_id=dst_folder.uuid,
folder_name=dst_folder.name,
**file.model_dump(exclude={"uuid", "folder_id", "folder_name", "sim_path"}),
)
self.num_file_creations += 1
# increment access counter
file.num_access += 1
dst_folder.add_file(file_copy, force=True)
2023-08-03 12:14:11 +01:00
else:
self.sys_log.error(f"Unable to copy file. {src_file_name} does not exist.")
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()}
state["deleted_folders"] = {folder.name: folder.describe_state() for folder in self.deleted_folders.values()}
state["num_file_creations"] = self.num_file_creations
state["num_file_deletions"] = self.num_file_deletions
return state
2023-08-03 12:14:11 +01:00
def apply_timestep(self, timestep: int) -> None:
"""Apply time step to FileSystem and its child folders and files."""
super().apply_timestep(timestep=timestep)
2024-04-15 11:50:08 +01:00
# apply timestep to folders
for folder_id in self.folders:
self.folders[folder_id].apply_timestep(timestep=timestep)
def pre_timestep(self, timestep: int) -> None:
"""Apply pre-timestep logic."""
super().pre_timestep(timestep)
# reset number of file creations
self.num_file_creations = 0
# reset number of file deletions
self.num_file_deletions = 0
2024-04-15 11:50:08 +01:00
for folder in self.folders.values():
folder.pre_timestep(timestep)
###############################################################
# Agent actions
###############################################################
def scan(self, instant_scan: bool = False) -> None:
"""
Scan all the folders (and child files) in the file system.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
for folder_id in self.folders:
self.folders[folder_id].scan(instant_scan=instant_scan)
def reveal_to_red(self, instant_scan: bool = False) -> None:
"""
Reveals all the folders (and child files) in the file system to the red agent.
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
"""
for folder_id in self.folders:
self.folders[folder_id].reveal_to_red(instant_scan=instant_scan)
def restore_folder(self, folder_name: str) -> bool:
2023-11-06 10:22:08 +00:00
"""
Restore a folder.
Checks the current folder's status and applies the correct fix for the folder.
:param: folder_name: name of the folder to restore
2023-11-06 10:22:08 +00:00
:type: folder_uuid: str
"""
folder = self.get_folder(folder_name=folder_name, include_deleted=True)
2023-11-06 10:22:08 +00:00
if folder is None:
self.sys_log.error(f"Unable to restore folder {folder_name}. Folder is not in deleted folder list.")
return False
2023-11-06 10:22:08 +00:00
self.deleted_folders.pop(folder.uuid, None)
2023-11-06 10:22:08 +00:00
folder.restore()
self.folders[folder.uuid] = folder
return True
def restore_file(self, folder_name: str, file_name: str) -> bool:
"""
Restore a file.
Checks the current file's status and applies the correct fix for the file.
2023-10-12 11:16:25 +01:00
:param: folder_name: name of the folder where the file is stored
:type: folder_name: str
2023-10-12 11:16:25 +01:00
:param: file_name: name of the file to restore
:type: file_name: str
2023-10-12 11:16:25 +01:00
"""
folder = self.get_folder(folder_name=folder_name)
if not folder:
self.sys_log.error(f"Cannot restore file {file_name} in folder {folder_name} as the folder does not exist.")
return False
2023-11-06 10:22:08 +00:00
file = folder.get_file(file_name=file_name, include_deleted=True)
2023-11-06 10:22:08 +00:00
if not file:
msg = f"Unable to restore file {file_name}. File was not found."
self.sys_log.error(msg)
return False
2023-11-06 10:22:08 +00:00
return folder.restore_file(file_name=file_name)
def access_file(self, folder_name: str, file_name: str) -> bool:
"""
Access a file.
Used by agents to simulate a file being accessed.
:param: folder_name: name of the folder where the file is stored
:type: folder_name: str
:param: file_name: name of the file to access
:type: file_name: str
"""
folder = self.get_folder(folder_name=folder_name)
if folder:
file = folder.get_file(file_name=file_name)
if file:
file.num_access += 1
return True
else:
self.sys_log.error(f"Unable to access file that does not exist. (file name: {file_name})")
return False