Files
PrimAITE/src/primaite/simulator/file_system/file.py

211 lines
7.6 KiB
Python

from __future__ import annotations
import hashlib
import json
import os.path
from pathlib import Path
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension
_LOGGER = getLogger(__name__)
class File(FileSystemItemABC):
"""
Class representing a file in the simulation.
:ivar Folder folder: The folder in which the file resides.
:ivar FileType file_type: The type of the file.
:ivar Optional[int] sim_size: The simulated file size.
:ivar bool real: Indicates if the file is actually a real file in the Node sim fs output.
:ivar Optional[Path] sim_path: The path if the file is real.
"""
folder_id: str
"The id of the Folder the File is in."
folder_name: str
"The name of the Folder the file is in."
file_type: FileType
"The type of File."
sim_size: Optional[int] = None
"The simulated file size."
real: bool = False
"Indicates whether the File is actually a real file in the Node sim fs output."
sim_path: Optional[Path] = None
"The Path if real is True."
sim_root: Optional[Path] = None
"Root path of the simulation."
def __init__(self, **kwargs):
"""
Initialise File class.
:param name: The name of the file.
:param file_type: The FileType of the file
:param size: The size of the FileSystemItemABC
"""
has_extension = "." in kwargs["name"]
# Attempt to use the file type extension to set/override the FileType
if has_extension:
extension = kwargs["name"].split(".")[-1]
kwargs["file_type"] = get_file_type_from_extension(extension)
else:
# If the file name does not have a extension, override file type to FileType.UNKNOWN
if not kwargs["file_type"]:
kwargs["file_type"] = FileType.UNKNOWN
if kwargs["file_type"] != FileType.UNKNOWN:
kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}"
# set random file size if none provided
if not kwargs.get("sim_size"):
kwargs["sim_size"] = kwargs["file_type"].default_size
super().__init__(**kwargs)
if self.real:
self.sim_path = self.sim_root / self.path
if not self.sim_path.exists():
self.sim_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.sim_path, mode="a"):
pass
self.sys_log.info(f"Created file /{self.path} (id: {self.uuid})")
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()
vals_to_include = {"folder_id", "folder_name", "file_type", "sim_size", "real", "sim_path", "sim_root"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
super().reset_component_for_episode(episode)
@property
def path(self) -> str:
"""
Get the path of the file in the file system.
:return: The full path of the file.
"""
return f"{self.folder_name}/{self.name}"
@property
def size(self) -> int:
"""
Get the size of the file in bytes.
:return: The size of the file in bytes.
"""
if self.real:
return os.path.getsize(self.sim_path)
return self.sim_size
def describe_state(self) -> Dict:
"""Produce a dictionary describing the current state of this object."""
state = super().describe_state()
state["size"] = self.size
state["file_type"] = self.file_type.name
return state
def scan(self) -> None:
"""Updates the visible statuses of the file."""
if self.deleted:
self.sys_log.error(f"Unable to scan deleted file {self.folder_name}/{self.name}")
return
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
self.visible_health_status = self.health_status
def reveal_to_red(self) -> None:
"""Reveals the folder/file to the red agent."""
if self.deleted:
self.sys_log.error(f"Unable to reveal deleted file {self.folder_name}/{self.name}")
return
self.revealed_to_red = True
def check_hash(self) -> None:
"""
Check if the file has been changed.
If changed, the file is considered corrupted.
Return False if corruption is detected, otherwise True
"""
if self.deleted:
self.sys_log.error(f"Unable to check hash of deleted file {self.folder_name}/{self.name}")
return
current_hash = None
# if file is real, read the file contents
if self.real:
with open(self.sim_path, "rb") as f:
file_hash = hashlib.blake2b()
while chunk := f.read(8192):
file_hash.update(chunk)
current_hash = file_hash.hexdigest()
else:
# otherwise get describe_state dict and hash that
current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest()
# if the previous hash is None, set the current hash to previous
if self.previous_hash is None:
self.previous_hash = current_hash
# if the previous hash and current hash do not match, mark file as corrupted
if self.previous_hash is not current_hash:
self.corrupt()
def repair(self) -> None:
"""Repair a corrupted File by setting the status to FileSystemItemStatus.GOOD."""
if self.deleted:
self.sys_log.error(f"Unable to repair deleted file {self.folder_name}/{self.name}")
return
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
def corrupt(self) -> None:
"""Corrupt a File by setting the status to FileSystemItemStatus.CORRUPT."""
if self.deleted:
self.sys_log.error(f"Unable to corrupt deleted file {self.folder_name}/{self.name}")
return
# set file status to good if corrupt
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPT
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}")
def restore(self) -> None:
"""Determines if the file needs to be repaired or unmarked as deleted."""
if self.deleted:
self.deleted = False
return
if self.health_status == FileSystemItemHealthStatus.CORRUPT:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.sys_log.info(f"Restored file {self.sim_path if self.sim_path else path}")
def delete(self):
"""Marks the file as deleted."""
if self.deleted:
self.sys_log.error(f"Unable to delete an already deleted file {self.folder_name}/{self.name}")
return
self.deleted = True
self.sys_log.info(f"File deleted {self.folder_name}/{self.name}")