#2740: added ability to merge validators + validators for folders

This commit is contained in:
Czar Echavez
2024-07-09 16:44:02 +01:00
parent b45f6bbd40
commit 20b9b61be5
6 changed files with 323 additions and 11 deletions

View File

@@ -34,6 +34,20 @@ class RequestPermissionValidator(BaseModel):
"""Message that is reported when a request is rejected by this validator."""
return "request rejected"
def __add__(self, other: "RequestPermissionValidator") -> "_CombinedValidator":
return _CombinedValidator(validators=[self, other])
class _CombinedValidator(RequestPermissionValidator):
validators: List[RequestPermissionValidator] = []
def __call__(self, request, context) -> bool:
return all(x(request, context) for x in self.validators)
@property
def fail_message(self):
return f"One of the following conditions are not met: {[v.fail_message for v in self.validators]}"
class AllowAllValidator(RequestPermissionValidator):
"""Always allows the request."""

View File

@@ -6,8 +6,8 @@ from typing import Any, Dict, List, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType, SimComponent
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.file_system.folder import Folder
@@ -42,6 +42,10 @@ class FileSystem(SimComponent):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
self._folder_exists = FileSystem._FolderExistsValidator(file_system=self)
self._folder_deleted = FileSystem._FolderNotDeletedValidator(file_system=self)
self._file_exists = FileSystem._FileExistsValidator(file_system=self)
rm = super()._init_request_manager()
self._delete_manager = RequestManager()
@@ -50,13 +54,15 @@ class FileSystem(SimComponent):
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.delete_file(folder_name=request[0], file_name=request[1])
)
),
validator=self._file_exists,
),
)
self._delete_manager.add_request(
name="folder",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0]))
func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0])),
validator=self._folder_exists,
),
)
rm.add_request(
@@ -144,10 +150,13 @@ class FileSystem(SimComponent):
)
self._folder_request_manager = RequestManager()
rm.add_request("folder", RequestType(func=self._folder_request_manager))
rm.add_request(
"folder",
RequestType(func=self._folder_request_manager, validator=self._folder_exists + self._folder_deleted),
)
self._file_request_manager = RequestManager()
rm.add_request("file", RequestType(func=self._file_request_manager))
rm.add_request("file", RequestType(func=self._file_request_manager, validator=self._file_exists))
return rm
@@ -626,3 +635,65 @@ class FileSystem(SimComponent):
self.sys_log.error(f"Unable to access file that does not exist. (file name: {file_name})")
return False
class _FolderExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the Folder exists.
Actions cannot be performed on a non-existent folder.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if folder exists."""
return self.file_system.get_folder(folder_name=request[0]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on folder because it does not exist"
class _FolderNotDeletedValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the Folder has not been deleted.
Actions cannot be performed on a deleted folder.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if folder exists and is not deleted."""
# get folder
folder = self.file_system.get_folder(folder_name=request[0], include_deleted=True)
return folder is not None and not folder.deleted
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on folder because it is deleted."
class _FileExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File exists.
Actions cannot be performed on a non-existent file.
"""
file_system: FileSystem
"""Save a reference to the FileSystem instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists."""
return self.file_system.get_file(folder_name=request[0], file_name=request[1]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on application '{self.application.name}' because it is not in the "
f"{self.state.name} state."
)

View File

@@ -6,8 +6,8 @@ from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
@@ -55,6 +55,8 @@ class Folder(FileSystemItemABC):
More information in user guide and docstring for SimComponent._init_request_manager.
"""
self._file_exists = Folder._FileExistsValidator(folder=self)
rm = super()._init_request_manager()
rm.add_request(
name="delete",
@@ -65,7 +67,7 @@ class Folder(FileSystemItemABC):
self._file_request_manager = RequestManager()
rm.add_request(
name="file",
request_type=RequestType(func=self._file_request_manager),
request_type=RequestType(func=self._file_request_manager, validator=self._file_exists),
)
return rm
@@ -469,3 +471,25 @@ class Folder(FileSystemItemABC):
self.deleted = True
return True
class _FileExistsValidator(RequestPermissionValidator):
"""
When requests come in, this validator will only let them through if the File exists.
Actions cannot be performed on a non-existent file.
"""
folder: Folder
"""Save a reference to the Folder instance."""
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Returns True if file exists."""
return self.folder.get_file(file_name=request[0]) is not None
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on application '{self.application.name}' because it is not in the "
f"{self.state.name} state."
)

View File

@@ -0,0 +1,82 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import uuid
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_create_file(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
random_folder = str(uuid.uuid4())
random_file = str(uuid.uuid4())
assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is None
action = (
"NODE_FILE_CREATE",
{"node_id": 0, "folder_name": random_folder, "file_name": random_file},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is not None
def test_file_delete_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
assert file.deleted is False
action = (
"NODE_FILE_DELETE",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.deleted
def test_file_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png")
file.corrupt()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FILE_SCAN",
{"node_id": 0, "folder_id": 0, "file_id": 0},
)
agent.store_action(action)
game.step()
assert file.health_status == FileSystemItemHealthStatus.CORRUPT
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT

View File

@@ -0,0 +1,123 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
import uuid
from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.hardware.nodes.host.computer import Computer
@pytest.fixture
def game_and_agent_fixture(game_and_agent):
"""Create a game with a simple agent that can be controlled by the tests."""
game, agent = game_and_agent
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
client_1.start_up_duration = 3
return (game, agent)
def test_create_folder(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the validator allows a folder to be created."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
random_folder = str(uuid.uuid4())
assert client_1.file_system.get_folder(folder_name=random_folder) is None
action = (
"NODE_FOLDER_CREATE",
{
"node_id": 0,
"folder_name": random_folder,
},
)
agent.store_action(action)
game.step()
assert client_1.file_system.get_folder(folder_name=random_folder) is not None
def test_folder_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before scanning."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
action = (
"NODE_FOLDER_SCAN",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
for i in range(folder.scan_duration + 1):
game.step()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
def test_folder_repair_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before repairing."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FOLDER_REPAIR",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
assert folder.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_restore_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
"""Test to make sure that the validator checks if the folder exists before restoring."""
game, agent = game_and_agent_fixture
client_1 = game.simulation.network.get_node_by_hostname("client_1")
folder = client_1.file_system.get_folder(folder_name="downloads")
folder.corrupt()
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
action = (
"NODE_FOLDER_RESTORE",
{
"node_id": 0, # client_1,
"folder_id": 0, # downloads
},
)
agent.store_action(action)
game.step()
assert folder.health_status == FileSystemItemHealthStatus.RESTORING

View File

@@ -6,8 +6,6 @@ import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.system.services.service import ServiceOperatingState
@pytest.fixture