Merged PR 157: File System Class setup

## Summary
Skeleton of the FileSystem for the simulation. Does not have all the requirements for the FileSystem yet, since we need to start somewhere

## Test process
Created unit tests for:
- [FileSystem](https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/157?_a=files&path=/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py)
- [FileSystemFolder](https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/157?_a=files&path=/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_folder.py)
- [FileSystemFile](https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/157?_a=files&path=/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system_file.py)

## Design document
https://dev.azure.com/ma-dev-uk/PrimAITE/_wiki/wikis/PrimAITE.wiki/151/FileSystem

## Checklist
- [x] This PR is linked to a **work item**
- [x] I have performed **self-review** of the code
- [x] I have written **tests** for any new functionality added with this PR
- [ ] I have updated the **documentation** if this PR changes or adds functionality
- [x] I have written/updated **design docs** if this PR implements new functionality.
- [x] I have run **pre-commit** checks for code style

Related work items: #1714
This commit is contained in:
Czar Echavez
2023-08-08 09:29:46 +00:00
16 changed files with 720 additions and 6 deletions

View File

@@ -86,5 +86,5 @@ stages:
displayName: 'Perform PrimAITE Setup'
- script: |
pytest -n 4
pytest -n auto
displayName: 'Run tests'

View File

@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- File System - ability to emulate a node's file system during a simulation
## [2.0.0] - 2023-07-26
### Added

View File

@@ -37,7 +37,7 @@ dependencies = [
"stable-baselines3==1.6.2",
"tensorflow==2.12.0",
"typer[all]==0.9.0",
"pydantic"
"pydantic==2.1.1"
]
[tool.setuptools.dynamic]

View File

@@ -1,12 +1,21 @@
"""Core of the PrimAITE Simulator."""
from abc import abstractmethod
from typing import Callable, Dict, List
from uuid import uuid4
from pydantic import BaseModel
class SimComponent(BaseModel):
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""
uuid: str
"""The component UUID."""
def __init__(self, **kwargs):
if not kwargs.get("uuid"):
kwargs["uuid"] = str(uuid4())
super().__init__(**kwargs)
@abstractmethod
def describe_state(self) -> Dict:

View File

@@ -0,0 +1,220 @@
from random import choice
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.core import SimComponent
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
from primaite.simulator.file_system.file_system_folder import FileSystemFolder
_LOGGER = getLogger(__name__)
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
folders: Dict = {}
"""List containing all the folders in the file system."""
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystem as a dict.
:return: A dict containing the current state of the FileSystemFile.
"""
pass
def get_folders(self) -> Dict:
"""Returns the list of folders."""
return self.folders
def create_file(
self,
file_name: str,
size: Optional[float] = None,
file_type: Optional[FileSystemFileType] = None,
folder: Optional[FileSystemFolder] = None,
folder_uuid: Optional[str] = None,
) -> FileSystemFile:
"""
Creates a FileSystemFile and adds it to the list of files.
If no size or file_type are provided, one will be chosen randomly.
If no folder_uuid or folder is provided, a new folder will be created.
:param: file_name: The file name
:type: file_name: str
:param: size: The size the file takes on disk.
:type: size: Optional[float]
:param: file_type: The type of the file
:type: Optional[FileSystemFileType]
:param: folder: The folder to add the file to
:type: folder: Optional[FileSystemFolder]
:param: folder_uuid: The uuid of the folder to add the file to
:type: folder_uuid: Optional[str]
"""
file = None
folder = None
if file_type is None:
file_type = self.get_random_file_type()
# if no folder uuid provided, create a folder and add file to it
if folder_uuid is not None:
# otherwise check for existence and add file
folder = self.get_folder_by_id(folder_uuid)
if folder is not None:
file = FileSystemFile(name=file_name, size=size, file_type=file_type)
folder.add_file(file=file)
else:
# check if a "root" folder exists
folder = self.get_folder_by_name("root")
if folder is None:
# create a root folder
folder = FileSystemFolder(name="root")
# add file to root folder
file = FileSystemFile(name=file_name, size=size, file_type=file_type)
folder.add_file(file)
self.folders[folder.uuid] = folder
return file
def create_folder(
self,
folder_name: str,
) -> FileSystemFolder:
"""
Creates a FileSystemFolder and adds it to the list of folders.
:param: folder_name: The name of the folder
:type: folder_name: str
"""
folder = FileSystemFolder(name=folder_name)
self.folders[folder.uuid] = folder
return folder
def delete_file(self, file: Optional[FileSystemFile] = None):
"""
Deletes a file and removes it from the files list.
:param file: The file to delete
:type file: Optional[FileSystemFile]
"""
# iterate through folders to delete the item with the matching uuid
for key in self.folders:
self.get_folder_by_id(key).remove_file(file)
def delete_folder(self, folder: FileSystemFolder):
"""
Deletes a folder, removes it from the folders list and removes any child folders and files.
:param folder: The folder to remove
:type folder: FileSystemFolder
"""
if folder is None or not isinstance(folder, FileSystemFolder):
raise Exception(f"Invalid folder: {folder}")
if self.folders.get(folder.uuid):
del self.folders[folder.uuid]
else:
_LOGGER.debug(f"File with UUID {folder.uuid} was not found.")
def move_file(self, file: FileSystemFile, src_folder: FileSystemFolder, target_folder: FileSystemFolder):
"""
Moves a file from one folder to another.
can provide
:param: file: The file to move
:type: file: FileSystemFile
:param: src_folder: The folder where the file is located
:type: FileSystemFolder
:param: target_folder: The folder where the file should be moved to
:type: FileSystemFolder
"""
# check that the folders exist
if src_folder is None:
raise Exception("Source folder not provided")
if target_folder is None:
raise Exception("Target folder not provided")
if file is None:
raise Exception("File to be moved is None")
# remove file from src
src_folder.remove_file(file)
# add file to target
target_folder.add_file(file)
def copy_file(self, file: FileSystemFile, src_folder: FileSystemFolder, target_folder: FileSystemFolder):
"""
Copies a file from one folder to another.
can provide
:param: file: The file to move
:type: file: FileSystemFile
:param: src_folder: The folder where the file is located
:type: FileSystemFolder
:param: target_folder: The folder where the file should be moved to
:type: FileSystemFolder
"""
if src_folder is None:
raise Exception("Source folder not provided")
if target_folder is None:
raise Exception("Target folder not provided")
if file is None:
raise Exception("File to be moved is None")
# add file to target
target_folder.add_file(file)
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Checks if the file exists in any file system folders."""
for key in self.folders:
file = self.folders[key].get_file_by_id(file_id=file_id)
if file is not None:
return file
def get_folder_by_name(self, folder_name: str) -> FileSystemFolder:
"""
Returns a the first folder with a matching name.
:return: Returns the first FileSydtemFolder with a matching name
"""
matching_folder = None
for key in self.folders:
if self.folders[key].name == folder_name:
matching_folder = self.folders[key]
break
return matching_folder
def get_folder_by_id(self, folder_id: str) -> FileSystemFolder:
"""
Checks if the folder exists.
:param: folder_id: The id of the folder to find
:type: folder_id: str
"""
return self.folders[folder_id]
def get_random_file_type(self) -> FileSystemFileType:
"""
Returns a random FileSystemFileTypeEnum.
:return: A random file type Enum
"""
return choice(list(FileSystemFileType))

View File

@@ -0,0 +1,45 @@
from random import choice
from typing import Dict
from primaite.simulator.file_system.file_system_file_type import file_type_sizes_KB, FileSystemFileType
from primaite.simulator.file_system.file_system_item_abc import FileSystemItem
class FileSystemFile(FileSystemItem):
"""Class that represents a file in the simulation."""
file_type: FileSystemFileType = None
"""The type of the FileSystemFile"""
def __init__(self, **kwargs):
"""
Initialise FileSystemFile class.
:param name: The name of the file.
:type name: str
:param file_type: The FileSystemFileType of the file
:type file_type: Optional[FileSystemFileType]
:param size: The size of the FileSystemItem
:type size: Optional[float]
"""
# set random file type if none provided
# set random file type if none provided
if kwargs.get("file_type") is None:
kwargs["file_type"] = choice(list(FileSystemFileType))
# set random file size if none provided
if kwargs.get("size") is None:
kwargs["size"] = file_type_sizes_KB[kwargs["file_type"]]
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystemFile as a dict.
:return: A dict containing the current state of the FileSystemFile.
"""
pass

View File

@@ -0,0 +1,124 @@
from enum import Enum
class FileSystemFileType(str, Enum):
"""An enumeration of common file types."""
UNKNOWN = 0
"Unknown file type."
# Text formats
TXT = 1
"Plain text file."
DOC = 2
"Microsoft Word document (.doc)"
DOCX = 3
"Microsoft Word document (.docx)"
PDF = 4
"Portable Document Format."
HTML = 5
"HyperText Markup Language file."
XML = 6
"Extensible Markup Language file."
CSV = 7
"Comma-Separated Values file."
# Spreadsheet formats
XLS = 8
"Microsoft Excel file (.xls)"
XLSX = 9
"Microsoft Excel file (.xlsx)"
# Image formats
JPEG = 10
"JPEG image file."
PNG = 11
"PNG image file."
GIF = 12
"GIF image file."
BMP = 13
"Bitmap image file."
# Audio formats
MP3 = 14
"MP3 audio file."
WAV = 15
"WAV audio file."
# Video formats
MP4 = 16
"MP4 video file."
AVI = 17
"AVI video file."
MKV = 18
"MKV video file."
FLV = 19
"FLV video file."
# Presentation formats
PPT = 20
"Microsoft PowerPoint file (.ppt)"
PPTX = 21
"Microsoft PowerPoint file (.pptx)"
# Web formats
JS = 22
"JavaScript file."
CSS = 23
"Cascading Style Sheets file."
# Programming languages
PY = 24
"Python script file."
C = 25
"C source code file."
CPP = 26
"C++ source code file."
JAVA = 27
"Java source code file."
# Compressed file types
RAR = 28
"RAR archive file."
ZIP = 29
"ZIP archive file."
TAR = 30
"TAR archive file."
GZ = 31
"Gzip compressed file."
file_type_sizes_KB = {
FileSystemFileType.UNKNOWN: 0,
FileSystemFileType.TXT: 4,
FileSystemFileType.DOC: 50,
FileSystemFileType.DOCX: 30,
FileSystemFileType.PDF: 100,
FileSystemFileType.HTML: 15,
FileSystemFileType.XML: 10,
FileSystemFileType.CSV: 15,
FileSystemFileType.XLS: 100,
FileSystemFileType.XLSX: 25,
FileSystemFileType.JPEG: 100,
FileSystemFileType.PNG: 40,
FileSystemFileType.GIF: 30,
FileSystemFileType.BMP: 300,
FileSystemFileType.MP3: 5000,
FileSystemFileType.WAV: 25000,
FileSystemFileType.MP4: 25000,
FileSystemFileType.AVI: 50000,
FileSystemFileType.MKV: 50000,
FileSystemFileType.FLV: 15000,
FileSystemFileType.PPT: 200,
FileSystemFileType.PPTX: 100,
FileSystemFileType.JS: 10,
FileSystemFileType.CSS: 5,
FileSystemFileType.PY: 5,
FileSystemFileType.C: 5,
FileSystemFileType.CPP: 10,
FileSystemFileType.JAVA: 10,
FileSystemFileType.RAR: 1000,
FileSystemFileType.ZIP: 1000,
FileSystemFileType.TAR: 1000,
FileSystemFileType.GZ: 800,
}

View File

@@ -0,0 +1,69 @@
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_item_abc import FileSystemItem
_LOGGER = getLogger(__name__)
class FileSystemFolder(FileSystemItem):
"""Simulation FileSystemFolder."""
files: Dict = {}
"""List of files stored in the folder."""
is_quarantined: bool = False
"""Flag that marks the folder as quarantined if true."""
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Return a FileSystemFile with the matching id."""
return self.files.get(file_id)
def add_file(self, file: FileSystemFile):
"""Adds a file to the folder list."""
if file is None or not isinstance(file, FileSystemFile):
raise Exception(f"Invalid file: {file}")
# add to list
self.files[file.uuid] = file
self.size += file.size
def remove_file(self, file: Optional[FileSystemFile]):
"""
Removes a file from the folder list.
The method can take a FileSystemFile object or a file id.
:param: file: The file to remove
:type: Optional[FileSystemFile]
"""
if file is None or not isinstance(file, FileSystemFile):
raise Exception(f"Invalid file: {file}")
if self.files.get(file.uuid):
del self.files[file.uuid]
self.size -= file.size
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
def quarantine(self):
"""Quarantines the File System Folder."""
self.is_quarantined = True
def end_quarantine(self):
"""Ends the quarantine of the File System Folder."""
self.is_quarantined = False
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
return self.is_quarantined
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystemFolder as a dict.
:return: A dict containing the current state of the FileSystemFile.
"""
pass

View File

@@ -0,0 +1,17 @@
from typing import Dict
from primaite.simulator.core import SimComponent
class FileSystemItem(SimComponent):
"""Abstract base class for FileSystemItems used in the file system simulation."""
name: str
"""The name of the FileSystemItem."""
size: float = 0
"""The size the item takes up on disk."""
def describe_state(self) -> Dict:
"""Returns the state of the FileSystemItem."""
pass

View File

@@ -112,7 +112,7 @@ def temp_session_path() -> Path:
session_timestamp = datetime.now()
date_dir = session_timestamp.strftime("%Y-%m-%d")
session_path = session_timestamp.strftime("%Y-%m-%d_%H-%M-%S")
session_path = Path(tempfile.gettempdir()) / "primaite" / date_dir / session_path
session_path = Path(tempfile.gettempdir()) / "_primaite" / date_dir / session_path
session_path.mkdir(exist_ok=True, parents=True)
return session_path

View File

@@ -0,0 +1,129 @@
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.file_system_folder import FileSystemFolder
def test_create_folder_and_file():
"""Test creating a folder and a file."""
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 1
file = file_system.create_file(file_name="test_file", size=10, folder_uuid=folder.uuid)
assert len(file_system.get_folder_by_id(folder.uuid).files) is 1
assert file_system.get_file_by_id(file.uuid).name is "test_file"
assert file_system.get_file_by_id(file.uuid).size == 10
def test_create_file():
"""Tests that creating a file without a folder creates a folder and sets that as the file's parent."""
file_system = FileSystem()
file = file_system.create_file(file_name="test_file", size=10)
assert len(file_system.folders) is 1
assert file_system.get_folder_by_name("root").get_file_by_id(file.uuid) is file
def test_delete_file():
"""Tests that a file can be deleted."""
file_system = FileSystem()
file = file_system.create_file(file_name="test_file", size=10)
assert len(file_system.folders) is 1
folder_id = list(file_system.folders.keys())[0]
folder = file_system.get_folder_by_id(folder_id)
assert folder.get_file_by_id(file.uuid) is file
file_system.delete_file(file=file)
assert len(file_system.folders) is 1
assert len(file_system.get_folder_by_id(folder.uuid).files) is 0
def test_delete_non_existent_file():
"""Tests deleting a non existent file."""
file_system = FileSystem()
file = file_system.create_file(file_name="test_file", size=10)
not_added_file = file_system.create_file(file_name="test_file", size=10)
assert len(file_system.folders) is 1
folder_id = list(file_system.folders.keys())[0]
folder = file_system.get_folder_by_id(folder_id)
assert folder.get_file_by_id(file.uuid) is file
file_system.delete_file(file=not_added_file)
assert len(file_system.folders) is 1
assert len(file_system.get_folder_by_id(folder.uuid).files) is 1
def test_delete_folder():
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 1
file_system.delete_folder(folder)
assert len(file_system.folders) is 0
def test_deleting_a_non_existent_folder():
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
not_added_folder = FileSystemFolder(name="fake_folder")
assert len(file_system.folders) is 1
file_system.delete_folder(not_added_folder)
assert len(file_system.folders) is 1
def test_move_file():
"""Tests the file move function."""
file_system = FileSystem()
src_folder = file_system.create_folder(folder_name="test_folder_1")
assert len(file_system.folders) is 1
target_folder = file_system.create_folder(folder_name="test_folder_2")
assert len(file_system.folders) is 2
file = file_system.create_file(file_name="test_file", size=10, folder_uuid=src_folder.uuid)
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 0
file_system.move_file(file=file, src_folder=src_folder, target_folder=target_folder)
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 0
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 1
def test_copy_file():
"""Tests the file copy function."""
file_system = FileSystem()
src_folder = file_system.create_folder(folder_name="test_folder_1")
assert len(file_system.folders) is 1
target_folder = file_system.create_folder(folder_name="test_folder_2")
assert len(file_system.folders) is 2
file = file_system.create_file(file_name="test_file", size=10, folder_uuid=src_folder.uuid)
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 0
file_system.copy_file(file=file, src_folder=src_folder, target_folder=target_folder)
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 1
def test_serialisation():
"""Test to check that the object serialisation works correctly."""
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 1
file_system.create_file(file_name="test_file", size=10, folder_uuid=folder.uuid)
assert file_system.get_folder_by_id(folder.uuid) is folder
serialised_file_sys = file_system.model_dump_json()
deserialised_file_sys = FileSystem.model_validate_json(serialised_file_sys)
assert file_system.model_dump_json() == deserialised_file_sys.model_dump_json()

View File

@@ -0,0 +1,23 @@
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
def test_file_type():
"""Tests tha the FileSystemFile type is set correctly."""
file = FileSystemFile(name="test", file_type=FileSystemFileType.DOC)
assert file.file_type is FileSystemFileType.DOC
def test_get_size():
"""Tests that the file size is being returned properly."""
file = FileSystemFile(name="test", size=1.5)
assert file.size == 1.5
def test_serialisation():
"""Test to check that the object serialisation works correctly."""
file = FileSystemFile(name="test", size=1.5, file_type=FileSystemFileType.DOC)
serialised_file = file.model_dump_json()
deserialised_file = FileSystemFile.model_validate_json(serialised_file)
assert file.model_dump_json() == deserialised_file.model_dump_json()

View File

@@ -0,0 +1,75 @@
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
from primaite.simulator.file_system.file_system_folder import FileSystemFolder
def test_adding_removing_file():
"""Test the adding and removing of a file from a folder."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
assert folder.size == 10
assert len(folder.files) is 1
folder.remove_file(file)
assert folder.size == 0
assert len(folder.files) is 0
def test_remove_non_existent_file():
"""Test the removing of a file that does not exist."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
not_added_file = FileSystemFile(name="fake_file", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
assert folder.size == 10
assert len(folder.files) is 1
folder.remove_file(not_added_file)
assert folder.size == 10
assert len(folder.files) is 1
def test_get_file_by_id():
"""Test to make sure that the correct file is returned."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
file2 = FileSystemFile(name="test_file_2", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
folder.add_file(file2)
assert folder.size == 20
assert len(folder.files) is 2
assert folder.get_file_by_id(file_id=file.uuid) is file
def test_folder_quarantine_state():
"""Tests the changing of folder quarantine status."""
folder = FileSystemFolder(name="test")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.end_quarantine()
assert folder.quarantine_status() is False
def test_serialisation():
"""Test to check that the object serialisation works correctly."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
serialised_folder = folder.model_dump_json()
deserialised_folder = FileSystemFolder.model_validate_json(serialised_folder)
assert folder.model_dump_json() == deserialised_folder.model_dump_json()

View File

@@ -42,8 +42,8 @@ class TestIsolatedSimComponent:
return {}
comp = TestComponent(name="computer", size=(5, 10))
dump = comp.model_dump()
assert dump == {"name": "computer", "size": (5, 10)}
dump = comp.model_dump_json()
assert comp == TestComponent.model_validate_json(dump)
def test_apply_action(self):
"""Validate that we can override apply_action behaviour and it updates the state of the component."""