Merged PR 378: Add create file/folder and file access requests
## Summary Added requests in file system that allows: - file to be created - folder to be created - file to be "accessed" ## Test process https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/378?_a=files&path=/tests/integration_tests/test_simulation/test_request_response.py ## Checklist - [X] PR is linked to a **work item** - [X] **acceptance criteria** of linked ticket are met - [X] performed **self-review** of the code - [X] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [ ] updated the **change log** - [X] ran **pre-commit** checks for code style - [ ] attended to any **TO-DOs** left in the code Related work items: #2606
This commit is contained in:
@@ -313,6 +313,36 @@ class NodeFolderRestoreAction(NodeFolderAbstractAction):
|
||||
self.verb: str = "restore"
|
||||
|
||||
|
||||
class NodeFileCreateAction(AbstractAction):
|
||||
"""Action which creates a new file in a given folder."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", num_nodes: int, num_folders: int, **kwargs) -> None:
|
||||
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, **kwargs)
|
||||
self.verb: str = "create"
|
||||
|
||||
def form_request(self, node_id: int, folder_name: str, file_name: str) -> List[str]:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
if node_name is None or folder_name is None or file_name is None:
|
||||
return ["do_nothing"]
|
||||
return ["network", "node", node_name, "file_system", "create", "file", folder_name, file_name]
|
||||
|
||||
|
||||
class NodeFolderCreateAction(AbstractAction):
|
||||
"""Action which creates a new folder."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", num_nodes: int, num_folders: int, **kwargs) -> None:
|
||||
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, **kwargs)
|
||||
self.verb: str = "create"
|
||||
|
||||
def form_request(self, node_id: int, folder_name: str) -> List[str]:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
if node_name is None or folder_name is None:
|
||||
return ["do_nothing"]
|
||||
return ["network", "node", node_name, "file_system", "create", "folder", folder_name]
|
||||
|
||||
|
||||
class NodeFileAbstractAction(AbstractAction):
|
||||
"""Abstract base class for file actions.
|
||||
|
||||
@@ -393,6 +423,21 @@ class NodeFileCorruptAction(NodeFileAbstractAction):
|
||||
self.verb: str = "corrupt"
|
||||
|
||||
|
||||
class NodeFileAccessAction(AbstractAction):
|
||||
"""Action which increases a file's access count."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", num_nodes: int, num_folders: int, **kwargs) -> None:
|
||||
super().__init__(manager, num_nodes=num_nodes, num_folders=num_folders, **kwargs)
|
||||
self.verb: str = "access"
|
||||
|
||||
def form_request(self, node_id: int, folder_name: str, file_name: str) -> List[str]:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
if node_name is None or folder_name is None or file_name is None:
|
||||
return ["do_nothing"]
|
||||
return ["network", "node", node_name, "file_system", "access", folder_name, file_name]
|
||||
|
||||
|
||||
class NodeAbstractAction(AbstractAction):
|
||||
"""
|
||||
Abstract base class for node actions.
|
||||
@@ -846,11 +891,14 @@ class ActionManager:
|
||||
"NODE_APPLICATION_INSTALL": NodeApplicationInstallAction,
|
||||
"NODE_APPLICATION_REMOVE": NodeApplicationRemoveAction,
|
||||
"NODE_FILE_SCAN": NodeFileScanAction,
|
||||
"NODE_FILE_CREATE": NodeFileCreateAction,
|
||||
"NODE_FILE_CHECKHASH": NodeFileCheckhashAction,
|
||||
"NODE_FILE_DELETE": NodeFileDeleteAction,
|
||||
"NODE_FILE_REPAIR": NodeFileRepairAction,
|
||||
"NODE_FILE_RESTORE": NodeFileRestoreAction,
|
||||
"NODE_FILE_CORRUPT": NodeFileCorruptAction,
|
||||
"NODE_FILE_ACCESS": NodeFileAccessAction,
|
||||
"NODE_FOLDER_CREATE": NodeFolderCreateAction,
|
||||
"NODE_FOLDER_SCAN": NodeFolderScanAction,
|
||||
"NODE_FOLDER_CHECKHASH": NodeFolderCheckhashAction,
|
||||
"NODE_FOLDER_REPAIR": NodeFolderRepairAction,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
|
||||
@@ -63,6 +63,65 @@ class FileSystem(SimComponent):
|
||||
request_type=RequestType(func=self._delete_manager),
|
||||
)
|
||||
|
||||
self._create_manager = RequestManager()
|
||||
|
||||
def _create_file_action(request: List[Any], context: Any) -> RequestResponse:
|
||||
file = self.create_file(folder_name=request[0], file_name=request[1])
|
||||
if not file:
|
||||
return RequestResponse.from_bool(False)
|
||||
return RequestResponse(
|
||||
status="success",
|
||||
data={
|
||||
"file_name": file.name,
|
||||
"folder_name": file.folder_name,
|
||||
"file_type": file.file_type,
|
||||
"file_size": file.size,
|
||||
},
|
||||
)
|
||||
|
||||
self._create_manager.add_request(
|
||||
name="file",
|
||||
request_type=RequestType(func=_create_file_action),
|
||||
)
|
||||
|
||||
def _create_folder_action(request: List[Any], context: Any) -> RequestResponse:
|
||||
folder = self.create_folder(folder_name=request[0])
|
||||
if not folder:
|
||||
return RequestResponse.from_bool(False)
|
||||
return RequestResponse(status="success", data={"folder_name": folder.name})
|
||||
|
||||
self._create_manager.add_request(
|
||||
name="folder",
|
||||
request_type=RequestType(func=_create_folder_action),
|
||||
)
|
||||
rm.add_request(
|
||||
name="create",
|
||||
request_type=RequestType(func=self._create_manager),
|
||||
)
|
||||
|
||||
def _access_file_action(request: List[Any], context: Any) -> RequestResponse:
|
||||
file = self.get_file(folder_name=request[0], file_name=request[1])
|
||||
if not file:
|
||||
return RequestResponse.from_bool(False)
|
||||
|
||||
if self.access_file(folder_name=request[0], file_name=request[1]):
|
||||
return RequestResponse(
|
||||
status="success",
|
||||
data={
|
||||
"file_name": file.name,
|
||||
"folder_name": file.folder_name,
|
||||
"file_type": file.file_type,
|
||||
"file_size": file.size,
|
||||
"file_status": file.health_status,
|
||||
},
|
||||
)
|
||||
return RequestResponse.from_bool(False)
|
||||
|
||||
rm.add_request(
|
||||
name="access",
|
||||
request_type=RequestType(func=_access_file_action),
|
||||
)
|
||||
|
||||
self._restore_manager = RequestManager()
|
||||
self._restore_manager.add_request(
|
||||
name="file",
|
||||
@@ -494,3 +553,28 @@ class FileSystem(SimComponent):
|
||||
return False
|
||||
|
||||
return folder.restore_file(file_name=file_name)
|
||||
|
||||
def access_file(self, folder_name: str, file_name: str) -> bool:
|
||||
"""
|
||||
Access a file.
|
||||
|
||||
Used by agents to simulate a file being accessed.
|
||||
|
||||
:param: folder_name: name of the folder where the file is stored
|
||||
:type: folder_name: str
|
||||
|
||||
:param: file_name: name of the file to access
|
||||
:type: file_name: str
|
||||
"""
|
||||
folder = self.get_folder(folder_name=folder_name)
|
||||
|
||||
if folder:
|
||||
file = folder.get_file(file_name=file_name)
|
||||
|
||||
if file:
|
||||
file.num_access += 1
|
||||
return True
|
||||
else:
|
||||
self.sys_log.error(f"Unable to access file that does not exist. (file name: {file_name})")
|
||||
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user