diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index 8d8425ec..70485af5 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -34,6 +34,20 @@ class RequestPermissionValidator(BaseModel): """Message that is reported when a request is rejected by this validator.""" return "request rejected" + def __add__(self, other: "RequestPermissionValidator") -> "_CombinedValidator": + return _CombinedValidator(validators=[self, other]) + + +class _CombinedValidator(RequestPermissionValidator): + validators: List[RequestPermissionValidator] = [] + + def __call__(self, request, context) -> bool: + return all(x(request, context) for x in self.validators) + + @property + def fail_message(self): + return f"One of the following conditions are not met: {[v.fail_message for v in self.validators]}" + class AllowAllValidator(RequestPermissionValidator): """Always allows the request.""" diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index 456b800c..42aa0573 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -6,8 +6,8 @@ from typing import Any, Dict, List, Optional from prettytable import MARKDOWN, PrettyTable -from primaite.interface.request import RequestResponse -from primaite.simulator.core import RequestManager, RequestType, SimComponent +from primaite.interface.request import RequestFormat, RequestResponse +from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType, SimComponent from primaite.simulator.file_system.file import File from primaite.simulator.file_system.file_type import FileType from primaite.simulator.file_system.folder import Folder @@ -42,6 +42,10 @@ class FileSystem(SimComponent): More information in user guide and docstring for SimComponent._init_request_manager. """ + self._folder_exists = FileSystem._FolderExistsValidator(file_system=self) + self._folder_deleted = FileSystem._FolderNotDeletedValidator(file_system=self) + self._file_exists = FileSystem._FileExistsValidator(file_system=self) + rm = super()._init_request_manager() self._delete_manager = RequestManager() @@ -50,13 +54,15 @@ class FileSystem(SimComponent): request_type=RequestType( func=lambda request, context: RequestResponse.from_bool( self.delete_file(folder_name=request[0], file_name=request[1]) - ) + ), + validator=self._file_exists, ), ) self._delete_manager.add_request( name="folder", request_type=RequestType( - func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0])) + func=lambda request, context: RequestResponse.from_bool(self.delete_folder(folder_name=request[0])), + validator=self._folder_exists, ), ) rm.add_request( @@ -144,10 +150,13 @@ class FileSystem(SimComponent): ) self._folder_request_manager = RequestManager() - rm.add_request("folder", RequestType(func=self._folder_request_manager)) + rm.add_request( + "folder", + RequestType(func=self._folder_request_manager, validator=self._folder_exists + self._folder_deleted), + ) self._file_request_manager = RequestManager() - rm.add_request("file", RequestType(func=self._file_request_manager)) + rm.add_request("file", RequestType(func=self._file_request_manager, validator=self._file_exists)) return rm @@ -626,3 +635,65 @@ class FileSystem(SimComponent): self.sys_log.error(f"Unable to access file that does not exist. (file name: {file_name})") return False + + class _FolderExistsValidator(RequestPermissionValidator): + """ + When requests come in, this validator will only let them through if the Folder exists. + + Actions cannot be performed on a non-existent folder. + """ + + file_system: FileSystem + """Save a reference to the FileSystem instance.""" + + def __call__(self, request: RequestFormat, context: Dict) -> bool: + """Returns True if folder exists.""" + return self.file_system.get_folder(folder_name=request[0]) is not None + + @property + def fail_message(self) -> str: + """Message that is reported when a request is rejected by this validator.""" + return "Cannot perform request on folder because it does not exist" + + class _FolderNotDeletedValidator(RequestPermissionValidator): + """ + When requests come in, this validator will only let them through if the Folder has not been deleted. + + Actions cannot be performed on a deleted folder. + """ + + file_system: FileSystem + """Save a reference to the FileSystem instance.""" + + def __call__(self, request: RequestFormat, context: Dict) -> bool: + """Returns True if folder exists and is not deleted.""" + # get folder + folder = self.file_system.get_folder(folder_name=request[0], include_deleted=True) + return folder is not None and not folder.deleted + + @property + def fail_message(self) -> str: + """Message that is reported when a request is rejected by this validator.""" + return "Cannot perform request on folder because it is deleted." + + class _FileExistsValidator(RequestPermissionValidator): + """ + When requests come in, this validator will only let them through if the File exists. + + Actions cannot be performed on a non-existent file. + """ + + file_system: FileSystem + """Save a reference to the FileSystem instance.""" + + def __call__(self, request: RequestFormat, context: Dict) -> bool: + """Returns True if file exists.""" + return self.file_system.get_file(folder_name=request[0], file_name=request[1]) is not None + + @property + def fail_message(self) -> str: + """Message that is reported when a request is rejected by this validator.""" + return ( + f"Cannot perform request on application '{self.application.name}' because it is not in the " + f"{self.state.name} state." + ) diff --git a/src/primaite/simulator/file_system/folder.py b/src/primaite/simulator/file_system/folder.py index dd2a4c70..af7cc660 100644 --- a/src/primaite/simulator/file_system/folder.py +++ b/src/primaite/simulator/file_system/folder.py @@ -6,8 +6,8 @@ from typing import Dict, Optional from prettytable import MARKDOWN, PrettyTable -from primaite.interface.request import RequestResponse -from primaite.simulator.core import RequestManager, RequestType +from primaite.interface.request import RequestFormat, RequestResponse +from primaite.simulator.core import RequestManager, RequestPermissionValidator, RequestType from primaite.simulator.file_system.file import File from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus @@ -55,6 +55,8 @@ class Folder(FileSystemItemABC): More information in user guide and docstring for SimComponent._init_request_manager. """ + self._file_exists = Folder._FileExistsValidator(folder=self) + rm = super()._init_request_manager() rm.add_request( name="delete", @@ -65,7 +67,7 @@ class Folder(FileSystemItemABC): self._file_request_manager = RequestManager() rm.add_request( name="file", - request_type=RequestType(func=self._file_request_manager), + request_type=RequestType(func=self._file_request_manager, validator=self._file_exists), ) return rm @@ -469,3 +471,25 @@ class Folder(FileSystemItemABC): self.deleted = True return True + + class _FileExistsValidator(RequestPermissionValidator): + """ + When requests come in, this validator will only let them through if the File exists. + + Actions cannot be performed on a non-existent file. + """ + + folder: Folder + """Save a reference to the Folder instance.""" + + def __call__(self, request: RequestFormat, context: Dict) -> bool: + """Returns True if file exists.""" + return self.folder.get_file(file_name=request[0]) is not None + + @property + def fail_message(self) -> str: + """Message that is reported when a request is rejected by this validator.""" + return ( + f"Cannot perform request on application '{self.application.name}' because it is not in the " + f"{self.state.name} state." + ) diff --git a/tests/integration_tests/game_layer/actions/test_file_request_permission.py b/tests/integration_tests/game_layer/actions/test_file_request_permission.py new file mode 100644 index 00000000..c422ad43 --- /dev/null +++ b/tests/integration_tests/game_layer/actions/test_file_request_permission.py @@ -0,0 +1,82 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +import uuid +from typing import Tuple + +import pytest + +from primaite.game.agent.interface import ProxyAgent +from primaite.game.game import PrimaiteGame +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus +from primaite.simulator.network.hardware.nodes.host.computer import Computer + + +@pytest.fixture +def game_and_agent_fixture(game_and_agent): + """Create a game with a simple agent that can be controlled by the tests.""" + game, agent = game_and_agent + + client_1: Computer = game.simulation.network.get_node_by_hostname("client_1") + client_1.start_up_duration = 3 + + return (game, agent) + + +def test_create_file(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test that the validator allows a folder to be created.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + random_folder = str(uuid.uuid4()) + random_file = str(uuid.uuid4()) + + assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is None + + action = ( + "NODE_FILE_CREATE", + {"node_id": 0, "folder_name": random_folder, "file_name": random_file}, + ) + agent.store_action(action) + game.step() + + assert client_1.file_system.get_file(folder_name=random_folder, file_name=random_file) is not None + + +def test_file_delete_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test that the validator allows a folder to be created.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png") + assert file.deleted is False + + action = ( + "NODE_FILE_DELETE", + {"node_id": 0, "folder_id": 0, "file_id": 0}, + ) + agent.store_action(action) + game.step() + + assert file.deleted + + +def test_file_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test that the validator allows a folder to be created.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + file = client_1.file_system.get_file(folder_name="downloads", file_name="cat.png") + + file.corrupt() + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.visible_health_status == FileSystemItemHealthStatus.GOOD + + action = ( + "NODE_FILE_SCAN", + {"node_id": 0, "folder_id": 0, "file_id": 0}, + ) + agent.store_action(action) + game.step() + + assert file.health_status == FileSystemItemHealthStatus.CORRUPT + assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT diff --git a/tests/integration_tests/game_layer/actions/test_folder_request_permission.py b/tests/integration_tests/game_layer/actions/test_folder_request_permission.py new file mode 100644 index 00000000..e5e0806a --- /dev/null +++ b/tests/integration_tests/game_layer/actions/test_folder_request_permission.py @@ -0,0 +1,123 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +import uuid +from typing import Tuple + +import pytest + +from primaite.game.agent.interface import ProxyAgent +from primaite.game.game import PrimaiteGame +from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus +from primaite.simulator.network.hardware.nodes.host.computer import Computer + + +@pytest.fixture +def game_and_agent_fixture(game_and_agent): + """Create a game with a simple agent that can be controlled by the tests.""" + game, agent = game_and_agent + + client_1: Computer = game.simulation.network.get_node_by_hostname("client_1") + client_1.start_up_duration = 3 + + return (game, agent) + + +def test_create_folder(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test that the validator allows a folder to be created.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + random_folder = str(uuid.uuid4()) + + assert client_1.file_system.get_folder(folder_name=random_folder) is None + + action = ( + "NODE_FOLDER_CREATE", + { + "node_id": 0, + "folder_name": random_folder, + }, + ) + agent.store_action(action) + game.step() + + assert client_1.file_system.get_folder(folder_name=random_folder) is not None + + +def test_folder_scan_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test to make sure that the validator checks if the folder exists before scanning.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + folder = client_1.file_system.get_folder(folder_name="downloads") + assert folder.health_status == FileSystemItemHealthStatus.GOOD + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + + folder.corrupt() + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD + + action = ( + "NODE_FOLDER_SCAN", + { + "node_id": 0, # client_1, + "folder_id": 0, # downloads + }, + ) + agent.store_action(action) + game.step() + + for i in range(folder.scan_duration + 1): + game.step() + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT + + +def test_folder_repair_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test to make sure that the validator checks if the folder exists before repairing.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + folder = client_1.file_system.get_folder(folder_name="downloads") + folder.corrupt() + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + + action = ( + "NODE_FOLDER_REPAIR", + { + "node_id": 0, # client_1, + "folder_id": 0, # downloads + }, + ) + agent.store_action(action) + game.step() + + assert folder.health_status == FileSystemItemHealthStatus.GOOD + + +def test_folder_restore_action(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + """Test to make sure that the validator checks if the folder exists before restoring.""" + game, agent = game_and_agent_fixture + + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + folder = client_1.file_system.get_folder(folder_name="downloads") + folder.corrupt() + + assert folder.health_status == FileSystemItemHealthStatus.CORRUPT + + action = ( + "NODE_FOLDER_RESTORE", + { + "node_id": 0, # client_1, + "folder_id": 0, # downloads + }, + ) + agent.store_action(action) + game.step() + + assert folder.health_status == FileSystemItemHealthStatus.RESTORING diff --git a/tests/integration_tests/game_layer/actions/test_nic_request_permission.py b/tests/integration_tests/game_layer/actions/test_nic_request_permission.py index 4c1619e7..d796b75e 100644 --- a/tests/integration_tests/game_layer/actions/test_nic_request_permission.py +++ b/tests/integration_tests/game_layer/actions/test_nic_request_permission.py @@ -6,8 +6,6 @@ import pytest from primaite.game.agent.interface import ProxyAgent from primaite.game.game import PrimaiteGame from primaite.simulator.network.hardware.nodes.host.computer import Computer -from primaite.simulator.network.hardware.nodes.host.server import Server -from primaite.simulator.system.services.service import ServiceOperatingState @pytest.fixture