Merge branch 'feature/1816_Database-Service-(Network-and-User-Interaction)' into feature/1752-dns-server-and-client

This commit is contained in:
Czar Echavez
2023-09-08 10:41:03 +01:00
20 changed files with 831 additions and 788 deletions

View File

@@ -1,5 +1,14 @@
from datetime import datetime
from primaite import _PRIMAITE_ROOT
TEMP_SIM_OUTPUT = _PRIMAITE_ROOT.parent.parent / "simulation_output"
SIM_OUTPUT = None
"A path at the repo root dir to use temporarily for sim output testing while in dev."
# TODO: Remove once we integrate the simulation into PrimAITE and it uses the primaite session path
if not SIM_OUTPUT:
session_timestamp = datetime.now()
date_dir = session_timestamp.strftime("%Y-%m-%d")
sim_path = session_timestamp.strftime("%Y-%m-%d_%H-%M-%S")
SIM_OUTPUT = _PRIMAITE_ROOT.parent.parent / "simulation_output" / date_dir / sim_path
SIM_OUTPUT.mkdir(exist_ok=True, parents=True)

View File

@@ -149,8 +149,8 @@
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.file_system.file_system_file_type import FileSystemFileType\n",
"from primaite.simulator.file_system.file_system_file import FileSystemFile"
"from primaite.simulator.file_system.file_type import FileType\n",
"from primaite.simulator.file_system.file_system import File"
]
},
{
@@ -160,7 +160,7 @@
"outputs": [],
"source": [
"my_pc_downloads_folder = my_pc.file_system.create_folder(\"downloads\")\n",
"my_pc_downloads_folder.add_file(FileSystemFile(name=\"firefox_installer.zip\",file_type=FileSystemFileType.ZIP))"
"my_pc_downloads_folder.add_file(File(name=\"firefox_installer.zip\",file_type=FileType.ZIP))"
]
},
{
@@ -171,7 +171,7 @@
{
"data": {
"text/plain": [
"FileSystemFile(uuid='7d56a563-ecc0-4011-8c97-240dd6c885c0', name='favicon.ico', size=40.0, file_type=<FileSystemFileType.PNG: '11'>, action_manager=None)"
"File(uuid='7d56a563-ecc0-4011-8c97-240dd6c885c0', name='favicon.ico', size=40.0, file_type=<FileType.PNG: '11'>, action_manager=None)"
]
},
"execution_count": 9,
@@ -181,7 +181,7 @@
],
"source": [
"my_server_folder = my_server.file_system.create_folder(\"static\")\n",
"my_server.file_system.create_file(\"favicon.ico\", file_type=FileSystemFileType.PNG)"
"my_server.file_system.create_file(\"favicon.ico\", file_type=FileType.PNG)"
]
},
{

View File

@@ -1,242 +1,519 @@
from random import choice
from __future__ import annotations
import math
import os.path
import shutil
from pathlib import Path
from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import SimComponent
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
from primaite.simulator.file_system.file_system_folder import FileSystemFolder
from primaite.simulator.file_system.file_type import FileType, get_file_type_from_extension
from primaite.simulator.system.core.sys_log import SysLog
_LOGGER = getLogger(__name__)
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
def convert_size(size_bytes: int) -> str:
"""
Convert a file size from bytes to a string with a more human-readable format.
folders: Dict[str, FileSystemFolder] = {}
"""List containing all the folders in the file system."""
This function takes the size of a file in bytes and converts it to a string representation with appropriate size
units (B, KB, MB, GB, etc.).
:param size_bytes: The size of the file in bytes.
:return: The human-readable string representation of the file size.
"""
if size_bytes == 0:
return "0 B"
# Tuple of size units starting from Bytes up to Yottabytes
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
# Calculate the index (i) that will be used to select the appropriate size unit from size_name
i = int(math.floor(math.log(size_bytes, 1024)))
# Calculate the adjusted size value (s) in terms of the new size unit
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
class FileSystemItemABC(SimComponent):
"""
Abstract base class for file system items used in the file system simulation.
:ivar name: The name of the FileSystemItemABC.
"""
name: str
"The name of the FileSystemItemABC."
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update({"folders": {uuid: folder.describe_state() for uuid, folder in self.folders.items()}})
state.update(
{
"name": self.name,
}
)
return state
def get_folders(self) -> Dict:
"""Returns the list of folders."""
return self.folders
@property
def size_str(self) -> str:
"""
Get the file size in a human-readable string format.
This property makes use of the :func:`convert_size` function to convert the `self.size` attribute to a string
that is easier to read and understand.
:return: The human-readable string representation of the file size.
"""
return convert_size(self.size)
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
folders: Dict[str, Folder] = {}
"List containing all the folders in the file system."
_folders_by_name: Dict[str, Folder] = {}
sys_log: SysLog
sim_root: Path
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Ensure a default root folder
if not self.folders:
self.create_folder("root")
@property
def size(self) -> int:
"""
Calculate and return the total size of all folders in the file system.
:return: The sum of the sizes of all folders in the file system.
"""
return sum(folder.size for folder in self.folders.values())
def show(self, markdown: bool = False, full: bool = False):
"""
Prints a table of the FileSystem, displaying either just folders or full files.
:param markdown: Flag indicating if output should be in markdown format.
:param full: Flag indicating if to show full files.
"""
headers = ["Folder", "Size"]
if full:
headers[0] = "File Path"
table = PrettyTable(headers)
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} File System"
for folder in self.folders.values():
if not full:
table.add_row([folder.name, folder.size_str])
else:
for file in folder.files.values():
table.add_row([file.path, file.size_str])
if full:
print(table.get_string(sortby="File Path"))
else:
print(table.get_string(sortby="Folder"))
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()}
return state
def create_folder(self, folder_name: str) -> Folder:
"""
Creates a Folder and adds it to the list of folders.
:param folder_name: The name of the folder.
"""
# check if folder with name already exists
if self.get_folder(folder_name):
raise Exception(f"Cannot create folder as it already exists: {folder_name}")
folder = Folder(name=folder_name, fs=self)
self.folders[folder.uuid] = folder
self._folders_by_name[folder.name] = folder
self.sys_log.info(f"Created folder /{folder.name}")
return folder
def delete_folder(self, folder_name: str):
"""
Deletes a folder, removes it from the folders list and removes any child folders and files.
:param folder_name: The name of the folder.
"""
if folder_name == "root":
self.sys_log.warning("Cannot delete the root folder.")
return
folder = self._folders_by_name.get(folder_name)
if folder:
for file in folder.files.values():
self.delete_file(file)
self.folders.pop(folder.uuid)
self._folders_by_name.pop(folder.name)
self.sys_log.info(f"Deleted folder /{folder.name} and its contents")
else:
_LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}")
def create_file(
self,
file_name: str,
size: Optional[float] = None,
file_type: Optional[FileSystemFileType] = None,
folder: Optional[FileSystemFolder] = None,
folder_uuid: Optional[str] = None,
) -> FileSystemFile:
size: Optional[int] = None,
file_type: Optional[FileType] = None,
folder_name: Optional[str] = None,
real: bool = False,
) -> File:
"""
Creates a FileSystemFile and adds it to the list of files.
Creates a File and adds it to the list of files.
If no size or file_type are provided, one will be chosen randomly.
If no folder_uuid or folder is provided, a new folder will be created.
:param: file_name: The file name
:type: file_name: str
:param: size: The size the file takes on disk.
:type: size: Optional[float]
:param: file_type: The type of the file
:type: Optional[FileSystemFileType]
:param: folder: The folder to add the file to
:type: folder: Optional[FileSystemFolder]
:param: folder_uuid: The uuid of the folder to add the file to
:type: folder_uuid: Optional[str]
:param file_name: The file name.
:param size: The size the file takes on disk in bytes.
:param file_type: The type of the file.
:param folder_name: The folder to add the file to.
:param real: "Indicates whether the File is actually a real file in the Node sim fs output."
"""
file = None
folder = None
if file_type is None:
file_type = self.get_random_file_type()
# if no folder uuid provided, create a folder and add file to it
if folder_uuid is not None:
# otherwise check for existence and add file
folder = self.get_folder_by_id(folder_uuid)
if folder is not None:
if folder_name:
# check if file with name already exists
if folder.get_file_by_name(file_name):
raise Exception(f'File with name "{file_name}" already exists.')
file = FileSystemFile(name=file_name, size=size, file_type=file_type)
folder.add_file(file=file)
folder = self._folders_by_name.get(folder_name)
# If not then create it
if not folder:
folder = self.create_folder(folder_name)
else:
# check if a "root" folder exists
folder = self.get_folder_by_name("root")
if folder is None:
# create a root folder
folder = FileSystemFolder(name="root")
# Use root folder if folder_name not supplied
folder = self._folders_by_name["root"]
# add file to root folder
file = FileSystemFile(name=file_name, size=size, file_type=file_type)
folder.add_file(file)
self.folders[folder.uuid] = folder
# Create the file and add it to the folder
file = File(
name=file_name,
sim_size=size,
file_type=file_type,
folder=folder,
real=real,
sim_path=self.sim_root if real else None,
)
folder.add_file(file)
self.sys_log.info(f"Created file /{file.path}")
return file
def create_folder(
self,
folder_name: str,
) -> FileSystemFolder:
def get_file(self, folder_name: str, file_name: str) -> Optional[File]:
"""
Creates a FileSystemFolder and adds it to the list of folders.
Retrieve a file by its name from a specific folder.
:param: folder_name: The name of the folder
:type: folder_name: str
:param folder_name: The name of the folder where the file resides.
:param file_name: The name of the file to be retrieved, including its extension.
:return: An instance of File if it exists, otherwise `None`.
"""
# check if folder with name already exists
if self.get_folder_by_name(folder_name):
raise Exception(f'Folder with name "{folder_name}" already exists.')
folder = self.get_folder(folder_name)
if folder:
return folder.get_file(file_name)
self.fs.sys_log.info(f"file not found /{folder_name}/{file_name}")
folder = FileSystemFolder(name=folder_name)
self.folders[folder.uuid] = folder
return folder
def delete_file(self, file: Optional[FileSystemFile] = None):
def delete_file(self, folder_name: str, file_name: str):
"""
Deletes a file and removes it from the files list.
Delete a file by its name from a specific folder.
:param file: The file to delete
:type file: Optional[FileSystemFile]
:param folder_name: The name of the folder containing the file.
:param file_name: The name of the file to be deleted, including its extension.
"""
# iterate through folders to delete the item with the matching uuid
for key in self.folders:
self.get_folder_by_id(key).remove_file(file)
folder = self.get_folder(folder_name)
if folder:
file = folder.get_file(file_name)
if file:
folder.remove_file(file)
self.sys_log.info(f"Deleted file /{file.path}")
def delete_folder(self, folder: FileSystemFolder):
def move_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str):
"""
Deletes a folder, removes it from the folders list and removes any child folders and files.
Move a file from one folder to another.
:param folder: The folder to remove
:type folder: FileSystemFolder
:param src_folder_name: The name of the source folder containing the file.
:param src_file_name: The name of the file to be moved.
:param dst_folder_name: The name of the destination folder.
"""
if folder is None or not isinstance(folder, FileSystemFolder):
raise Exception(f"Invalid folder: {folder}")
file = self.get_file(folder_name=src_folder_name, file_name=src_file_name)
if file:
src_folder = file.folder
if self.folders.get(folder.uuid):
del self.folders[folder.uuid]
# remove file from src
src_folder.remove_file(file)
dst_folder = self.get_folder(folder_name=dst_folder_name)
if not dst_folder:
dst_folder = self.create_folder(dst_folder_name)
# add file to dst
dst_folder.add_file(file)
if file.real:
old_sim_path = file.sim_path
file.sim_path = file.folder.fs.sim_root / file.path
file.sim_path.parent.mkdir(exist_ok=True)
shutil.move(old_sim_path, file.sim_path)
def copy_file(self, src_folder_name: str, src_file_name: str, dst_folder_name: str):
"""
Copy a file from one folder to another.
:param src_folder_name: The name of the source folder containing the file.
:param src_file_name: The name of the file to be copied.
:param dst_folder_name: The name of the destination folder.
"""
file = self.get_file(folder_name=src_folder_name, file_name=src_file_name)
if file:
dst_folder = self.get_folder(folder_name=dst_folder_name)
if not dst_folder:
dst_folder = self.create_folder(dst_folder_name)
new_file = file.make_copy(dst_folder=dst_folder)
dst_folder.add_file(new_file)
if file.real:
new_file.sim_path.parent.mkdir(exist_ok=True)
shutil.copy2(file.sim_path, new_file.sim_path)
def get_folder(self, folder_name: str) -> Optional[Folder]:
"""
Get a folder by its name if it exists.
:param folder_name: The folder name.
:return: The matching Folder.
"""
return self._folders_by_name.get(folder_name)
def get_folder_by_id(self, folder_uuid: str) -> Optional[Folder]:
"""
Get a folder by its uuid if it exists.
:param folder_uuid: The folder uuid.
:return: The matching Folder.
"""
return self.folders.get(folder_uuid)
class Folder(FileSystemItemABC):
"""Simulation Folder."""
fs: FileSystem
"The FileSystem the Folder is in."
files: Dict[str, File] = {}
"Files stored in the folder."
_files_by_name: Dict[str, File] = {}
"Files by their name as <file name>.<file type>."
is_quarantined: bool = False
"Flag that marks the folder as quarantined if true."
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
state = super().describe_state()
state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()}
state["is_quarantined"] = self.is_quarantined
return state
def show(self, markdown: bool = False):
"""
Display the contents of the Folder in tabular format.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["File", "Size"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.fs.sys_log.hostname} File System Folder ({self.name})"
for file in self.files.values():
table.add_row([file.name, file.size_str])
print(table.get_string(sortby="File"))
@property
def size(self) -> int:
"""
Calculate and return the total size of all files in the folder.
:return: The total size of all files in the folder. If no files exist or all have `None`
size, returns 0.
"""
return sum(file.size for file in self.files.values() if file.size is not None)
def get_file(self, file_name: str) -> Optional[File]:
"""
Get a file by its name.
File name must be the filename and prefix, like 'memo.docx'.
:param file_name: The file name.
:return: The matching File.
"""
# TODO: Increment read count?
return self._files_by_name.get(file_name)
def get_file_by_id(self, file_uuid: str) -> File:
"""
Get a file by its uuid.
:param file_uuid: The file uuid.
:return: The matching File.
"""
return self.files.get(file_uuid)
def add_file(self, file: File):
"""
Adds a file to the folder.
:param File file: The File object to be added to the folder.
:raises Exception: If the provided `file` parameter is None or not an instance of the
`File` class.
"""
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
# check if file with id already exists in folder
if file.uuid in self.files:
_LOGGER.debug(f"File with id {file.uuid} already exists in folder")
else:
_LOGGER.debug(f"File with UUID {folder.uuid} was not found.")
# add to list
self.files[file.uuid] = file
self._files_by_name[file.name] = file
file.folder = self
def move_file(self, file: FileSystemFile, src_folder: FileSystemFolder, target_folder: FileSystemFolder):
def remove_file(self, file: Optional[File]):
"""
Moves a file from one folder to another.
Removes a file from the folder list.
can provide
The method can take a File object or a file id.
:param: file: The file to move
:type: file: FileSystemFile
:param: src_folder: The folder where the file is located
:type: FileSystemFolder
:param: target_folder: The folder where the file should be moved to
:type: FileSystemFolder
:param file: The file to remove
"""
# check that the folders exist
if src_folder is None:
raise Exception("Source folder not provided")
if file is None or not isinstance(file, File):
raise Exception(f"Invalid file: {file}")
if target_folder is None:
raise Exception("Target folder not provided")
if self.files.get(file.uuid):
self.files.pop(file.uuid)
self._files_by_name.pop(file.name)
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
if file is None:
raise Exception("File to be moved is None")
def quarantine(self):
"""Quarantines the File System Folder."""
if not self.is_quarantined:
self.is_quarantined = True
self.fs.sys_log.info(f"Quarantined folder ./{self.name}")
# check if file with name already exists
if target_folder.get_file_by_name(file.name):
raise Exception(f'Folder with name "{file.name}" already exists.')
def unquarantine(self):
"""Unquarantine of the File System Folder."""
if self.is_quarantined:
self.is_quarantined = False
self.fs.sys_log.info(f"Quarantined folder ./{self.name}")
# remove file from src
src_folder.remove_file(file)
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
return self.is_quarantined
# add file to target
target_folder.add_file(file)
def copy_file(self, file: FileSystemFile, src_folder: FileSystemFolder, target_folder: FileSystemFolder):
class File(FileSystemItemABC):
"""
Class representing a file in the simulation.
:ivar Folder folder: The folder in which the file resides.
:ivar FileType file_type: The type of the file.
:ivar Optional[int] sim_size: The simulated file size.
:ivar bool real: Indicates if the file is actually a real file in the Node sim fs output.
:ivar Optional[Path] sim_path: The path if the file is real.
"""
folder: Folder
"The Folder the File is in."
file_type: FileType
"The type of File."
sim_size: Optional[int] = None
"The simulated file size."
real: bool = False
"Indicates whether the File is actually a real file in the Node sim fs output."
sim_path: Optional[Path] = None
"The Path if real is True."
def __init__(self, **kwargs):
"""
Copies a file from one folder to another.
Initialise File class.
can provide
:param: file: The file to move
:type: file: FileSystemFile
:param: src_folder: The folder where the file is located
:type: FileSystemFolder
:param: target_folder: The folder where the file should be moved to
:type: FileSystemFolder
:param name: The name of the file.
:param file_type: The FileType of the file
:param size: The size of the FileSystemItemABC
"""
if src_folder is None:
raise Exception("Source folder not provided")
has_extension = "." in kwargs["name"]
if target_folder is None:
raise Exception("Target folder not provided")
# Attempt to use the file type extension to set/override the FileType
if has_extension:
extension = kwargs["name"].split(".")[-1]
kwargs["file_type"] = get_file_type_from_extension(extension)
else:
# If the file name does not have a extension, override file type to FileType.UNKNOWN
if not kwargs["file_type"]:
kwargs["file_type"] = FileType.UNKNOWN
if kwargs["file_type"] != FileType.UNKNOWN:
kwargs["name"] = f"{kwargs['name']}.{kwargs['file_type'].name.lower()}"
if file is None:
raise Exception("File to be moved is None")
# set random file size if none provided
if not kwargs.get("sim_size"):
kwargs["sim_size"] = kwargs["file_type"].default_size
super().__init__(**kwargs)
if self.real:
self.sim_path = self.folder.fs.sim_root / self.path
if not self.sim_path.exists():
self.sim_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.sim_path, mode="a"):
pass
# check if file with name already exists
if target_folder.get_file_by_name(file.name):
raise Exception(f'Folder with name "{file.name}" already exists.')
# add file to target
target_folder.add_file(file)
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Checks if the file exists in any file system folders."""
for key in self.folders:
file = self.folders[key].get_file_by_id(file_id=file_id)
if file is not None:
return file
def get_folder_by_name(self, folder_name: str) -> Optional[FileSystemFolder]:
def make_copy(self, dst_folder: Folder) -> File:
"""
Returns a the first folder with a matching name.
Create a copy of the current File object in the given destination folder.
:return: Returns the first FileSydtemFolder with a matching name
:param Folder dst_folder: The destination folder for the copied file.
:return: A new File object that is a copy of the current file.
"""
matching_folder = None
for key in self.folders:
if self.folders[key].name == folder_name:
matching_folder = self.folders[key]
break
return matching_folder
return File(folder=dst_folder, **self.model_dump(exclude={"uuid", "folder", "sim_path"}))
def get_folder_by_id(self, folder_id: str) -> FileSystemFolder:
@property
def path(self) -> str:
"""
Checks if the folder exists.
Get the path of the file in the file system.
:param: folder_id: The id of the folder to find
:type: folder_id: str
:return: The full path of the file.
"""
return self.folders[folder_id]
return f"{self.folder.name}/{self.name}"
def get_random_file_type(self) -> FileSystemFileType:
@property
def size(self) -> int:
"""
Returns a random FileSystemFileTypeEnum.
Get the size of the file in bytes.
:return: A random file type Enum
:return: The size of the file in bytes.
"""
return choice(list(FileSystemFileType))
if self.real:
return os.path.getsize(self.sim_path)
return self.sim_size
def describe_state(self) -> Dict:
"""Produce a dictionary describing the current state of this object."""
state = super().describe_state()
state["size"] = self.size
state["file_type"] = self.file_type.name
return state

View File

@@ -1,55 +0,0 @@
from random import choice
from typing import Dict
from primaite.simulator.file_system.file_system_file_type import file_type_sizes_KB, FileSystemFileType
from primaite.simulator.file_system.file_system_item_abc import FileSystemItem
class FileSystemFile(FileSystemItem):
"""Class that represents a file in the simulation."""
file_type: FileSystemFileType = None
"""The type of the FileSystemFile"""
def __init__(self, **kwargs):
"""
Initialise FileSystemFile class.
:param name: The name of the file.
:type name: str
:param file_type: The FileSystemFileType of the file
:type file_type: Optional[FileSystemFileType]
:param size: The size of the FileSystemItem
:type size: Optional[float]
"""
# set random file type if none provided
# set random file type if none provided
if kwargs.get("file_type") is None:
kwargs["file_type"] = choice(list(FileSystemFileType))
# set random file size if none provided
if kwargs.get("size") is None:
kwargs["size"] = file_type_sizes_KB[kwargs["file_type"]]
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"uuid": self.uuid,
"file_type": self.file_type.name,
}
)
return state

View File

@@ -1,132 +0,0 @@
from enum import Enum
class FileSystemFileType(str, Enum):
"""An enumeration of common file types."""
UNKNOWN = 0
"Unknown file type."
# Text formats
TXT = 1
"Plain text file."
DOC = 2
"Microsoft Word document (.doc)"
DOCX = 3
"Microsoft Word document (.docx)"
PDF = 4
"Portable Document Format."
HTML = 5
"HyperText Markup Language file."
XML = 6
"Extensible Markup Language file."
CSV = 7
"Comma-Separated Values file."
# Spreadsheet formats
XLS = 8
"Microsoft Excel file (.xls)"
XLSX = 9
"Microsoft Excel file (.xlsx)"
# Image formats
JPEG = 10
"JPEG image file."
PNG = 11
"PNG image file."
GIF = 12
"GIF image file."
BMP = 13
"Bitmap image file."
# Audio formats
MP3 = 14
"MP3 audio file."
WAV = 15
"WAV audio file."
# Video formats
MP4 = 16
"MP4 video file."
AVI = 17
"AVI video file."
MKV = 18
"MKV video file."
FLV = 19
"FLV video file."
# Presentation formats
PPT = 20
"Microsoft PowerPoint file (.ppt)"
PPTX = 21
"Microsoft PowerPoint file (.pptx)"
# Web formats
JS = 22
"JavaScript file."
CSS = 23
"Cascading Style Sheets file."
# Programming languages
PY = 24
"Python script file."
C = 25
"C source code file."
CPP = 26
"C++ source code file."
JAVA = 27
"Java source code file."
# Compressed file types
RAR = 28
"RAR archive file."
ZIP = 29
"ZIP archive file."
TAR = 30
"TAR archive file."
GZ = 31
"Gzip compressed file."
# Database file types
MDF = 32
"MS SQL Server primary database file"
NDF = 33
"MS SQL Server secondary database file"
LDF = 34
"MS SQL Server transaction log"
file_type_sizes_KB = {
FileSystemFileType.UNKNOWN: 0,
FileSystemFileType.TXT: 4,
FileSystemFileType.DOC: 50,
FileSystemFileType.DOCX: 30,
FileSystemFileType.PDF: 100,
FileSystemFileType.HTML: 15,
FileSystemFileType.XML: 10,
FileSystemFileType.CSV: 15,
FileSystemFileType.XLS: 100,
FileSystemFileType.XLSX: 25,
FileSystemFileType.JPEG: 100,
FileSystemFileType.PNG: 40,
FileSystemFileType.GIF: 30,
FileSystemFileType.BMP: 300,
FileSystemFileType.MP3: 5000,
FileSystemFileType.WAV: 25000,
FileSystemFileType.MP4: 25000,
FileSystemFileType.AVI: 50000,
FileSystemFileType.MKV: 50000,
FileSystemFileType.FLV: 15000,
FileSystemFileType.PPT: 200,
FileSystemFileType.PPTX: 100,
FileSystemFileType.JS: 10,
FileSystemFileType.CSS: 5,
FileSystemFileType.PY: 5,
FileSystemFileType.C: 5,
FileSystemFileType.CPP: 10,
FileSystemFileType.JAVA: 10,
FileSystemFileType.RAR: 1000,
FileSystemFileType.ZIP: 1000,
FileSystemFileType.TAR: 1000,
FileSystemFileType.GZ: 800,
}

View File

@@ -1,87 +0,0 @@
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_item_abc import FileSystemItem
_LOGGER = getLogger(__name__)
class FileSystemFolder(FileSystemItem):
"""Simulation FileSystemFolder."""
files: Dict[str, FileSystemFile] = {}
"""List of files stored in the folder."""
is_quarantined: bool = False
"""Flag that marks the folder as quarantined if true."""
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"files": {uuid: file.describe_state() for uuid, file in self.files.items()},
"is_quarantined": self.is_quarantined,
}
)
return state
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Return a FileSystemFile with the matching id."""
return self.files.get(file_id)
def get_file_by_name(self, file_name: str) -> FileSystemFile:
"""Return a FileSystemFile with the matching id."""
return next((f for f in list(self.files) if f.name == file_name), None)
def add_file(self, file: FileSystemFile):
"""Adds a file to the folder list."""
if file is None or not isinstance(file, FileSystemFile):
raise Exception(f"Invalid file: {file}")
# check if file with id already exists in folder
if file.uuid in self.files:
_LOGGER.debug(f"File with id {file.uuid} already exists in folder")
else:
# add to list
self.files[file.uuid] = file
self.size += file.size
def remove_file(self, file: Optional[FileSystemFile]):
"""
Removes a file from the folder list.
The method can take a FileSystemFile object or a file id.
:param: file: The file to remove
:type: Optional[FileSystemFile]
"""
if file is None or not isinstance(file, FileSystemFile):
raise Exception(f"Invalid file: {file}")
if self.files.get(file.uuid):
del self.files[file.uuid]
self.size -= file.size
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
def quarantine(self):
"""Quarantines the File System Folder."""
self.is_quarantined = True
def end_quarantine(self):
"""Ends the quarantine of the File System Folder."""
self.is_quarantined = False
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
return self.is_quarantined

View File

@@ -1,31 +0,0 @@
from typing import Dict
from primaite.simulator.core import SimComponent
class FileSystemItem(SimComponent):
"""Abstract base class for FileSystemItems used in the file system simulation."""
name: str
"""The name of the FileSystemItem."""
size: float = 0
"""The size the item takes up on disk."""
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"name": self.name,
"size": self.size,
}
)
return state

View File

@@ -4,12 +4,14 @@ import re
import secrets
from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
from primaite.simulator import SIM_OUTPUT
from primaite.simulator.core import SimComponent
from primaite.simulator.domain.account import Account
from primaite.simulator.file_system.file_system import FileSystem
@@ -890,6 +892,8 @@ class Node(SimComponent):
"All processes on the node."
file_system: FileSystem
"The nodes file system."
root: Path
"Root directory for simulation output."
sys_log: SysLog
arp: ARPCache
icmp: ICMP
@@ -917,14 +921,19 @@ class Node(SimComponent):
kwargs["icmp"] = ICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"))
if not kwargs.get("session_manager"):
kwargs["session_manager"] = SessionManager(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"))
if not kwargs.get("root"):
kwargs["root"] = SIM_OUTPUT / kwargs["hostname"]
if not kwargs.get("file_system"):
kwargs["file_system"] = FileSystem(sys_log=kwargs["sys_log"], sim_root=kwargs["root"] / "fs")
if not kwargs.get("software_manager"):
kwargs["software_manager"] = SoftwareManager(
sys_log=kwargs.get("sys_log"), session_manager=kwargs.get("session_manager")
sys_log=kwargs.get("sys_log"),
session_manager=kwargs.get("session_manager"),
file_system=kwargs.get("file_system"),
)
if not kwargs.get("file_system"):
kwargs["file_system"] = FileSystem()
super().__init__(**kwargs)
self.arp.nics = self.nics
self.session_manager.software_manager = self.software_manager
def describe_state(self) -> Dict:
"""
@@ -1091,6 +1100,8 @@ class Node(SimComponent):
if frame.ip.protocol == IPProtocol.TCP:
if frame.tcp.src_port == Port.ARP:
self.arp.process_arp_packet(from_nic=from_nic, arp_packet=frame.arp)
else:
self.session_manager.receive_frame(frame)
elif frame.ip.protocol == IPProtocol.UDP:
pass
elif frame.ip.protocol == IPProtocol.ICMP:

View File

@@ -6,6 +6,7 @@ from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.database import DatabaseService
def client_server_routed() -> Network:
@@ -160,6 +161,39 @@ def arcd_uc2_network() -> Network:
database_server.power_on()
network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3])
ddl = """
CREATE TABLE IF NOT EXISTS user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(50) NOT NULL,
email VARCHAR(50) NOT NULL,
age INT,
city VARCHAR(50),
occupation VARCHAR(50)
);"""
user_insert_statements = [
"INSERT INTO user (name, email, age, city, occupation) VALUES ('John Doe', 'johndoe@example.com', 32, 'New York', 'Engineer');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Jane Smith', 'janesmith@example.com', 27, 'Los Angeles', 'Designer');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Bob Johnson', 'bobjohnson@example.com', 45, 'Chicago', 'Manager');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Alice Lee', 'alicelee@example.com', 22, 'San Francisco', 'Student');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('David Kim', 'davidkim@example.com', 38, 'Houston', 'Consultant');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Emily Chen', 'emilychen@example.com', 29, 'Seattle', 'Software Developer');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Frank Wang', 'frankwang@example.com', 55, 'New York', 'Entrepreneur');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Grace Park', 'gracepark@example.com', 31, 'Los Angeles', 'Marketing Specialist');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Henry Wu', 'henrywu@example.com', 40, 'Chicago', 'Accountant');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Isabella Kim', 'isabellakim@example.com', 26, 'San Francisco', 'Graphic Designer');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Jake Lee', 'jakelee@example.com', 33, 'Houston', 'Sales Manager');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Kelly Chen', 'kellychen@example.com', 28, 'Seattle', 'Web Developer');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Lucas Liu', 'lucasliu@example.com', 42, 'New York', 'Lawyer');", # noqa
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Maggie Wang', 'maggiewang@example.com', 30, 'Los Angeles', 'Data Analyst');", # noqa
]
database_server.software_manager.add_service(DatabaseService)
database: DatabaseService = database_server.software_manager.services["Database"] # noqa
database.start()
database._process_sql(ddl) # noqa
for insert_statement in user_insert_statements:
database._process_sql(insert_statement) # noqa
# Backup Server
backup_server = Server(
hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
@@ -183,4 +217,6 @@ def arcd_uc2_network() -> Network:
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER)
return network

View File

@@ -1,8 +1,9 @@
import json
import logging
from pathlib import Path
from typing import Optional
from typing import Any, Dict, List, Optional
from primaite.simulator import TEMP_SIM_OUTPUT
from primaite.simulator import SIM_OUTPUT
class _JSONFilter(logging.Filter):
@@ -51,6 +52,18 @@ class PacketCapture:
self.logger.addFilter(_JSONFilter())
def read(self) -> List[Dict[str, Any]]:
"""
Read packet capture logs and return them as a list of dictionaries.
:return: List of frames captured, represented as dictionaries.
"""
frames = []
with open(self._get_log_path(), "r") as file:
while line := file.readline():
frames.append(json.loads(line.rstrip()))
return frames
@property
def _logger_name(self) -> str:
"""Get PCAP the logger name."""
@@ -62,7 +75,7 @@ class PacketCapture:
def _get_log_path(self) -> Path:
"""Get the path for the log file."""
root = TEMP_SIM_OUTPUT / self.hostname
root = SIM_OUTPUT / self.hostname
root.mkdir(exist_ok=True, parents=True)
return root / f"{self._logger_name}.log"

View File

@@ -3,6 +3,7 @@ from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING, Union
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
@@ -22,7 +23,7 @@ ServiceClass = TypeVar("ServiceClass", bound=Service)
class SoftwareManager:
"""A class that manages all running Services and Applications on a Node and facilitates their communication."""
def __init__(self, session_manager: "SessionManager", sys_log: "SysLog"):
def __init__(self, session_manager: "SessionManager", sys_log: SysLog, file_system: FileSystem):
"""
Initialize a new instance of SoftwareManager.
@@ -33,6 +34,7 @@ class SoftwareManager:
self.applications: Dict[str, Application] = {}
self.port_protocol_mapping: Dict[Tuple[Port, IPProtocol], Union[Service, Application]] = {}
self.sys_log: SysLog = sys_log
self.file_system: FileSystem = file_system
def add_service(self, service_class: Type[ServiceClass]):
"""
@@ -40,7 +42,7 @@ class SoftwareManager:
:param: service_class: The class of the service to add
"""
service = service_class(software_manager=self, sys_log=self.sys_log)
service = service_class(software_manager=self, sys_log=self.sys_log, file_system=self.file_system)
service.software_manager = self
self.services[service.name] = service
@@ -108,9 +110,10 @@ class SoftwareManager:
"""
receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None)
if receiver:
receiver.receive_payload(payload=payload, session_id=session_id)
receiver.receive(payload=payload, session_id=session_id)
else:
self.sys_log.error(f"No service or application found for port {port} and protocol {protocol}")
pass
def show(self, markdown: bool = False):
"""

View File

@@ -3,7 +3,7 @@ from pathlib import Path
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator import TEMP_SIM_OUTPUT
from primaite.simulator import SIM_OUTPUT
class _NotJSONFilter(logging.Filter):
@@ -81,7 +81,7 @@ class SysLog:
:return: Path object representing the location of the log file.
"""
root = TEMP_SIM_OUTPUT / self.hostname
root = SIM_OUTPUT / self.hostname
root.mkdir(exist_ok=True, parents=True)
return root / f"{self.hostname}_sys.log"

View File

@@ -1,12 +1,82 @@
from typing import Dict
import sqlite3
from ipaddress import IPv4Address
from sqlite3 import OperationalError
from typing import Any, Dict, List, Optional, Union
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
from primaite.simulator.network.hardware.base import Node
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.file_system.file_system import File
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.service import Service
class DatabaseService(Service):
"""Service loosely modelled on Microsoft SQL Server."""
"""
A class for simulating a generic SQL Server service.
This class inherits from the `Service` class and provides methods to manage and query a SQLite database.
"""
backup_server: Optional[IPv4Address] = None
"The IP Address of the server the "
def __init__(self, **kwargs):
kwargs["name"] = "Database"
kwargs["port"] = Port.POSTGRES_SERVER
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
self._db_file: File
self._create_db_file()
self._conn = sqlite3.connect(self._db_file.sim_path)
self._cursor = self._conn.cursor()
def tables(self) -> List[str]:
"""
Get a list of table names present in the database.
:return: List of table names.
"""
sql = "SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence';"
results = self._process_sql(sql)
return [row[0] for row in results["data"]]
def show(self, markdown: bool = False):
"""
Prints a list of table names in the database using PrettyTable.
:param markdown: Whether to output the table in Markdown format.
"""
table = PrettyTable(["Table"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.file_system.sys_log.hostname} Database"
for row in self.tables():
table.add_row([row])
print(table)
def _create_db_file(self):
"""Creates the Simulation File and sqlite file in the file system."""
self._db_file: File = self.file_system.create_file(folder_name="database", file_name="database.db", real=True)
self.folder = self._db_file.folder
def _process_sql(self, query: str) -> Dict[str, Union[int, List[Any]]]:
"""
Executes the given SQL query and returns the result.
:param query: The SQL query to be executed.
:return: Dictionary containing status code and data fetched.
"""
try:
self._cursor.execute(query)
self._conn.commit()
except OperationalError:
# Handle the case where the table does not exist.
return {"status_code": 404, "data": []}
return {"status_code": 200, "data": self._cursor.fetchall()}
def describe_state(self) -> Dict:
"""
@@ -19,58 +89,28 @@ class DatabaseService(Service):
"""
return super().describe_state()
def uninstall(self) -> None:
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
Undo installation procedure.
Processes the incoming SQL payload and sends the result back.
This method deletes files created when installing the database, and the database folder if it is empty.
:param payload: The SQL query to be executed.
:param session_id: The session identifier.
:return: True if the Status Code is 200, otherwise False.
"""
super().uninstall()
node: Node = self.parent
node.file_system.delete_file(self.primary_store)
node.file_system.delete_file(self.transaction_log)
if self.secondary_store:
node.file_system.delete_file(self.secondary_store)
if len(self.folder.files) == 0:
node.file_system.delete_folder(self.folder)
result = self._process_sql(payload)
self.send(payload=result, session_id=session_id)
def install(self) -> None:
"""Perform first time install on a node, creating necessary files."""
super().install()
assert isinstance(self.parent, Node), "Database install can only happen after the db service is added to a node"
self._setup_files()
return payload["status_code"] == 200
def _setup_files(
self,
db_size: int = 1000,
use_secondary_db_file: bool = False,
secondary_db_size: int = 300,
folder_name: str = "database",
):
"""Set up files that are required by the database on the parent host.
:param db_size: Initial file size of the main database file, defaults to 1000
:type db_size: int, optional
:param use_secondary_db_file: Whether to use a secondary database file, defaults to False
:type use_secondary_db_file: bool, optional
:param secondary_db_size: Size of the secondary db file, defaults to None
:type secondary_db_size: int, optional
:param folder_name: Name of the folder which will be setup to hold the db files, defaults to "database"
:type folder_name: str, optional
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
"""
# note that this parent.file_system.create_folder call in the future will be authenticated by using permissions
# handler. This permission will be granted based on service account given to the database service.
self.parent: Node
self.folder = self.parent.file_system.create_folder(folder_name)
self.primary_store = self.parent.file_system.create_file(
"db_primary_store", db_size, FileSystemFileType.MDF, folder=self.folder
)
self.transaction_log = self.parent.file_system.create_file(
"db_transaction_log", "1", FileSystemFileType.LDF, folder=self.folder
)
if use_secondary_db_file:
self.secondary_store = self.parent.file_system.create_file(
"db_secondary_store", secondary_db_size, FileSystemFileType.NDF, folder=self.folder
)
else:
self.secondary_store = None
Send a SQL response back down to the SessionManager.
:param payload: The SQL query results.
:param session_id: The session identifier.
:return: True if the Status Code is 200, otherwise False.
"""
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
return payload["status_code"] == 200

View File

@@ -1,8 +1,9 @@
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, Optional
from primaite.simulator.core import Action, ActionManager, SimComponent
from primaite.simulator.file_system.file_system import FileSystem, Folder
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.sys_log import SysLog
@@ -79,6 +80,10 @@ class Software(SimComponent):
"An instance of Software Manager that is used by the parent node."
sys_log: SysLog = None
"An instance of SysLog that is used by the parent node."
file_system: FileSystem
"The FileSystem of the Node the Software is installed on."
folder: Optional[Folder] = None
"The folder on the file system the Software uses."
def _init_action_manager(self) -> ActionManager:
am = super()._init_action_manager()
@@ -216,7 +221,6 @@ class IOSoftware(Software):
:param kwargs: Additional keyword arguments specific to the implementation.
:return: True if the payload was successfully sent, False otherwise.
"""
pass
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
"""

View File

@@ -19,7 +19,17 @@ ACTION_SPACE_NODE_ACTION_VALUES = 1
_LOGGER = getLogger(__name__)
# PrimAITE v3 stuff
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.network.hardware.base import Node
@pytest.fixture(scope="function")
def file_system() -> FileSystem:
return Node(hostname="fs_node").file_system
# PrimAITE v2 stuff
class TempPrimaiteSession(PrimaiteSession):
"""
A temporary PrimaiteSession class.

View File

@@ -1,52 +1,39 @@
from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.database import DatabaseService
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareCriticality, SoftwareHealthState
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.networks import arcd_uc2_network
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket, Precedence
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
def test_installing_database():
db = DatabaseService(
name="SQL-database",
health_state_actual=SoftwareHealthState.GOOD,
health_state_visible=SoftwareHealthState.GOOD,
criticality=SoftwareCriticality.MEDIUM,
port=Port.SQL_SERVER,
operating_state=ServiceOperatingState.RUNNING,
def test_database_query_across_the_network():
"""Tests DB query across the network returns HTTP status 200 and date."""
network = arcd_uc2_network()
client_1: Computer = network.get_node_by_hostname("client_1")
client_1.arp.send_arp_request(IPv4Address("192.168.1.14"))
dst_mac_address = client_1.arp.get_arp_cache_mac_address(IPv4Address("192.168.1.14"))
outbound_nic = client_1.arp.get_arp_cache_nic(IPv4Address("192.168.1.14"))
client_1.ping("192.168.1.14")
frame = Frame(
ethernet=EthernetHeader(src_mac_addr=client_1.ethernet_port[1].mac_address, dst_mac_addr=dst_mac_address),
ip=IPPacket(
src_ip_address=client_1.ethernet_port[1].ip_address,
dst_ip_address=IPv4Address("192.168.1.14"),
precedence=Precedence.FLASH,
),
tcp=TCPHeader(src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER),
payload="SELECT * FROM user;",
)
node = Node(hostname="db-server")
outbound_nic.send_frame(frame)
node.install_service(db)
client_1_last_payload = outbound_nic.pcap.read()[-1]["payload"]
assert db in node
file_exists = False
for folder in node.file_system.folders.values():
for file in folder.files.values():
if file.name == "db_primary_store":
file_exists = True
break
if file_exists:
break
assert file_exists
def test_uninstalling_database():
db = DatabaseService(
name="SQL-database",
health_state_actual=SoftwareHealthState.GOOD,
health_state_visible=SoftwareHealthState.GOOD,
criticality=SoftwareCriticality.MEDIUM,
port=Port.SQL_SERVER,
operating_state=ServiceOperatingState.RUNNING,
)
node = Node(hostname="db-server")
node.install_service(db)
node.uninstall_service(db)
assert db not in node
assert node.file_system.get_folder_by_name("database") is None
assert client_1_last_payload["status_code"] == 200
assert client_1_last_payload["data"]

View File

@@ -1,132 +1,144 @@
import pytest
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_folder import FileSystemFolder
from primaite.simulator.file_system.file_type import FileType
def test_create_folder_and_file():
def test_create_folder_and_file(file_system):
"""Test creating a folder and a file."""
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 1
assert len(file_system.folders) == 1
file_system.create_folder(folder_name="test_folder")
file = file_system.create_file(file_name="test_file", size=10, folder_uuid=folder.uuid)
assert len(file_system.get_folder_by_id(folder.uuid).files) is 1
assert len(file_system.folders) is 2
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file_system.get_file_by_id(file.uuid).name is "test_file"
assert file_system.get_file_by_id(file.uuid).size == 10
assert len(file_system.get_folder("test_folder").files) == 1
assert file_system.get_folder("test_folder").get_file("test_file.txt")
def test_create_file():
def test_create_file_no_folder(file_system):
"""Tests that creating a file without a folder creates a folder and sets that as the file's parent."""
file_system = FileSystem()
file = file_system.create_file(file_name="test_file", size=10)
file = file_system.create_file(file_name="test_file.txt", size=10)
assert len(file_system.folders) is 1
assert file_system.get_folder_by_name("root").get_file_by_id(file.uuid) is file
assert file_system.get_folder("root").get_file("test_file.txt") == file
assert file_system.get_folder("root").get_file("test_file.txt").file_type == FileType.TXT
assert file_system.get_folder("root").get_file("test_file.txt").size == 10
def test_delete_file():
def test_create_file_no_extension(file_system):
"""Tests that creating a file without an extension sets the file type to FileType.UNKNOWN."""
file = file_system.create_file(file_name="test_file")
assert len(file_system.folders) is 1
assert file_system.get_folder("root").get_file("test_file") == file
assert file_system.get_folder("root").get_file("test_file").file_type == FileType.UNKNOWN
assert file_system.get_folder("root").get_file("test_file").size == 0
def test_delete_file(file_system):
"""Tests that a file can be deleted."""
file_system = FileSystem()
file_system.create_file(file_name="test_file.txt")
assert len(file_system.folders) == 1
assert len(file_system.get_folder("root").files) == 1
file = file_system.create_file(file_name="test_file", size=10)
assert len(file_system.folders) is 1
folder_id = list(file_system.folders.keys())[0]
folder = file_system.get_folder_by_id(folder_id)
assert folder.get_file_by_id(file.uuid) is file
file_system.delete_file(file=file)
assert len(file_system.folders) is 1
assert len(folder.files) is 0
file_system.delete_file(folder_name="root", file_name="test_file.txt")
assert len(file_system.folders) == 1
assert len(file_system.get_folder("root").files) == 0
def test_delete_non_existent_file():
def test_delete_non_existent_file(file_system):
"""Tests deleting a non existent file."""
file_system = FileSystem()
file = file_system.create_file(file_name="test_file", size=10)
not_added_file = FileSystemFile(name="not_added")
file_system.create_file(file_name="test_file.txt")
# folder should be created
assert len(file_system.folders) is 1
assert len(file_system.folders) == 1
# should only have 1 file in the file system
folder_id = list(file_system.folders.keys())[0]
folder = file_system.get_folder_by_id(folder_id)
assert len(list(folder.files)) is 1
assert folder.get_file_by_id(file.uuid) is file
assert len(file_system.get_folder("root").files) == 1
# deleting should not change how many files are in folder
file_system.delete_file(file=not_added_file)
assert len(file_system.folders) is 1
assert len(list(folder.files)) is 1
file_system.delete_file(folder_name="root", file_name="does_not_exist!")
# should still only be one folder
assert len(file_system.folders) == 1
# The folder should still have 1 file
assert len(file_system.get_folder("root").files) == 1
def test_delete_folder():
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 1
def test_delete_folder(file_system):
file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) == 2
file_system.delete_folder(folder)
assert len(file_system.folders) is 0
file_system.delete_folder(folder_name="test_folder")
assert len(file_system.folders) == 1
def test_deleting_a_non_existent_folder():
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
not_added_folder = FileSystemFolder(name="fake_folder")
assert len(file_system.folders) is 1
def test_deleting_a_non_existent_folder(file_system):
file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) == 2
file_system.delete_folder(not_added_folder)
assert len(file_system.folders) is 1
file_system.delete_folder(folder_name="does not exist!")
assert len(file_system.folders) == 2
def test_move_file():
def test_deleting_root_folder_fails(file_system):
assert len(file_system.folders) == 1
file_system.delete_folder(folder_name="root")
assert len(file_system.folders) == 1
def test_move_file(file_system):
"""Tests the file move function."""
file_system = FileSystem()
src_folder = file_system.create_folder(folder_name="test_folder_1")
assert len(file_system.folders) is 1
file_system.create_folder(folder_name="src_folder")
file_system.create_folder(folder_name="dst_folder")
target_folder = file_system.create_folder(folder_name="test_folder_2")
assert len(file_system.folders) is 2
file = file_system.create_file(file_name="test_file.txt", size=10, folder_name="src_folder")
original_uuid = file.uuid
file = file_system.create_file(file_name="test_file", size=10, folder_uuid=src_folder.uuid)
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 0
assert len(file_system.get_folder("src_folder").files) == 1
assert len(file_system.get_folder("dst_folder").files) == 0
file_system.move_file(file=file, src_folder=src_folder, target_folder=target_folder)
file_system.move_file(src_folder_name="src_folder", src_file_name="test_file.txt", dst_folder_name="dst_folder")
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 0
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 1
assert len(file_system.get_folder("src_folder").files) == 0
assert len(file_system.get_folder("dst_folder").files) == 1
assert file_system.get_file("dst_folder", "test_file.txt").uuid == original_uuid
def test_copy_file():
def test_copy_file(file_system):
"""Tests the file copy function."""
file_system = FileSystem()
src_folder = file_system.create_folder(folder_name="test_folder_1")
assert len(file_system.folders) is 1
file_system.create_folder(folder_name="src_folder")
file_system.create_folder(folder_name="dst_folder")
target_folder = file_system.create_folder(folder_name="test_folder_2")
assert len(file_system.folders) is 2
file = file_system.create_file(file_name="test_file.txt", size=10, folder_name="src_folder", real=True)
original_uuid = file.uuid
file = file_system.create_file(file_name="test_file", size=10, folder_uuid=src_folder.uuid)
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 0
assert len(file_system.get_folder("src_folder").files) == 1
assert len(file_system.get_folder("dst_folder").files) == 0
file_system.copy_file(file=file, src_folder=src_folder, target_folder=target_folder)
file_system.copy_file(src_folder_name="src_folder", src_file_name="test_file.txt", dst_folder_name="dst_folder")
assert len(file_system.get_folder_by_id(src_folder.uuid).files) is 1
assert len(file_system.get_folder_by_id(target_folder.uuid).files) is 1
assert len(file_system.get_folder("src_folder").files) == 1
assert len(file_system.get_folder("dst_folder").files) == 1
assert file_system.get_file("dst_folder", "test_file.txt").uuid != original_uuid
def test_serialisation():
def test_folder_quarantine_state(file_system):
"""Tests the changing of folder quarantine status."""
folder = file_system.get_folder("root")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.unquarantine()
assert folder.quarantine_status() is False
@pytest.mark.skip(reason="Skipping until we tackle serialisation")
def test_serialisation(file_system):
"""Test to check that the object serialisation works correctly."""
file_system = FileSystem()
folder = file_system.create_folder(folder_name="test_folder")
assert len(file_system.folders) is 1
file_system.create_file(file_name="test_file", size=10, folder_uuid=folder.uuid)
assert file_system.get_folder_by_id(folder.uuid) is folder
file_system.create_file(file_name="test_file.txt")
serialised_file_sys = file_system.model_dump_json()
deserialised_file_sys = FileSystem.model_validate_json(serialised_file_sys)

View File

@@ -1,23 +0,0 @@
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
def test_file_type():
"""Tests tha the FileSystemFile type is set correctly."""
file = FileSystemFile(name="test", file_type=FileSystemFileType.DOC)
assert file.file_type is FileSystemFileType.DOC
def test_get_size():
"""Tests that the file size is being returned properly."""
file = FileSystemFile(name="test", size=1.5)
assert file.size == 1.5
def test_serialisation():
"""Test to check that the object serialisation works correctly."""
file = FileSystemFile(name="test", size=1.5, file_type=FileSystemFileType.DOC)
serialised_file = file.model_dump_json()
deserialised_file = FileSystemFile.model_validate_json(serialised_file)
assert file.model_dump_json() == deserialised_file.model_dump_json()

View File

@@ -1,75 +0,0 @@
from primaite.simulator.file_system.file_system_file import FileSystemFile
from primaite.simulator.file_system.file_system_file_type import FileSystemFileType
from primaite.simulator.file_system.file_system_folder import FileSystemFolder
def test_adding_removing_file():
"""Test the adding and removing of a file from a folder."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
assert folder.size == 10
assert len(folder.files) is 1
folder.remove_file(file)
assert folder.size == 0
assert len(folder.files) is 0
def test_remove_non_existent_file():
"""Test the removing of a file that does not exist."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
not_added_file = FileSystemFile(name="fake_file", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
assert folder.size == 10
assert len(folder.files) is 1
folder.remove_file(not_added_file)
assert folder.size == 10
assert len(folder.files) is 1
def test_get_file_by_id():
"""Test to make sure that the correct file is returned."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
file2 = FileSystemFile(name="test_file_2", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
folder.add_file(file2)
assert folder.size == 20
assert len(folder.files) is 2
assert folder.get_file_by_id(file_id=file.uuid) is file
def test_folder_quarantine_state():
"""Tests the changing of folder quarantine status."""
folder = FileSystemFolder(name="test")
assert folder.quarantine_status() is False
folder.quarantine()
assert folder.quarantine_status() is True
folder.end_quarantine()
assert folder.quarantine_status() is False
def test_serialisation():
"""Test to check that the object serialisation works correctly."""
folder = FileSystemFolder(name="test")
file = FileSystemFile(name="test_file", size=10, file_type=FileSystemFileType.DOC)
folder.add_file(file)
serialised_folder = folder.model_dump_json()
deserialised_folder = FileSystemFolder.model_validate_json(serialised_folder)
assert folder.model_dump_json() == deserialised_folder.model_dump_json()

View File

@@ -1,15 +1,59 @@
from primaite.simulator.network.transmission.transport_layer import Port
import json
import pytest
from primaite.simulator.network.hardware.base import Node
from primaite.simulator.system.services.database import DatabaseService
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareCriticality, SoftwareHealthState
DDL = """
CREATE TABLE IF NOT EXISTS user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(50) NOT NULL,
email VARCHAR(50) NOT NULL,
age INT,
city VARCHAR(50),
occupation VARCHAR(50)
);"""
USER_INSERT_STATEMENTS = [
"INSERT INTO user (name, email, age, city, occupation) VALUES ('John Doe', 'johndoe@example.com', 32, 'New York', 'Engineer');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Jane Smith', 'janesmith@example.com', 27, 'Los Angeles', 'Designer');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Bob Johnson', 'bobjohnson@example.com', 45, 'Chicago', 'Manager');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Alice Lee', 'alicelee@example.com', 22, 'San Francisco', 'Student');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('David Kim', 'davidkim@example.com', 38, 'Houston', 'Consultant');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Emily Chen', 'emilychen@example.com', 29, 'Seattle', 'Software Developer');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Frank Wang', 'frankwang@example.com', 55, 'New York', 'Entrepreneur');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Grace Park', 'gracepark@example.com', 31, 'Los Angeles', 'Marketing Specialist');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Henry Wu', 'henrywu@example.com', 40, 'Chicago', 'Accountant');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Isabella Kim', 'isabellakim@example.com', 26, 'San Francisco', 'Graphic Designer');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Jake Lee', 'jakelee@example.com', 33, 'Houston', 'Sales Manager');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Kelly Chen', 'kellychen@example.com', 28, 'Seattle', 'Web Developer');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Lucas Liu', 'lucasliu@example.com', 42, 'New York', 'Lawyer');",
"INSERT INTO user (name, email, age, city, occupation) VALUES ('Maggie Wang', 'maggiewang@example.com', 30, 'Los Angeles', 'Data Analyst');",
]
def test_creation():
db = DatabaseService(
name="SQL-database",
health_state_actual=SoftwareHealthState.GOOD,
health_state_visible=SoftwareHealthState.GOOD,
criticality=SoftwareCriticality.MEDIUM,
port=Port.SQL_SERVER,
operating_state=ServiceOperatingState.RUNNING,
)
@pytest.fixture(scope="function")
def database_server() -> Node:
node = Node(hostname="db_node")
node.software_manager.add_service(DatabaseService)
node.software_manager.services["Database"].start()
return node
@pytest.fixture(scope="function")
def database(database_server) -> DatabaseService:
database: DatabaseService = database_server.software_manager.services["Database"] # noqa
database.receive(DDL, None)
for script in USER_INSERT_STATEMENTS:
database.receive(script, None)
return database
def test_creation(database_server):
database_server.software_manager.show()
def test_db_population(database):
database.show()
assert database.tables() == ["user"]