diff --git a/.azuredevops/pull_request_template.md b/.azuredevops/pull_request_template.md index fd28ed57..f7533b37 100644 --- a/.azuredevops/pull_request_template.md +++ b/.azuredevops/pull_request_template.md @@ -9,5 +9,6 @@ - [ ] I have performed **self-review** of the code - [ ] I have written **tests** for any new functionality added with this PR - [ ] I have updated the **documentation** if this PR changes or adds functionality -- [ ] I have written/updated **design docs** if this PR implements new functionality. +- [ ] I have written/updated **design docs** if this PR implements new functionality +- [ ] I have update the **change log** - [ ] I have run **pre-commit** checks for code style diff --git a/CHANGELOG.md b/CHANGELOG.md index dd8afbce..8f3f57fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ transmitting them from a Service/Application, down through the layers, over the a Service/Application another machine. - system - Added the core structure of Application, Services, and Components. Also added a SoftwareManager and SessionManager. +- Permission System - each action can define criteria that will be used to permit or deny agent actions. - File System - ability to emulate a node's file system during a simulation ## [2.0.0] - 2023-07-26 diff --git a/docs/source/simulation_structure.rst b/docs/source/simulation_structure.rst index 65373a72..2b213f16 100644 --- a/docs/source/simulation_structure.rst +++ b/docs/source/simulation_structure.rst @@ -11,3 +11,55 @@ top level, there is an object called the ``SimulationController`` _(doesn't exis and a software controller for managing software and users. Each node of the simulation 'tree' has responsibility for creating, deleting, and updating its direct descendants. + + + +Actions +======= +Agents can interact with the simulation by using actions. Actions are standardised with the +:py:class:`primaite.simulation.core.Action` class, which just holds a reference to two special functions. + +1. The action function itself, it must accept a `request` parameters which is a list of strings that describe what the + action should do. It must also accept a `context` dict which can house additional information surrounding the action. + For example, the context will typically include information about which entity intiated the action. +2. A validator function. This function should return a boolean value that decides if the request is permitted or not. + It uses the same paramters as the action function. + +Action Permissions +------------------ +When an agent tries to perform an action on a simulation component, that action will only be executed if the request is +validated. For example, some actions can require that an agent is logged into an admin account. Each action defines its +own permissions using an instance of :py:class:`primaite.simulation.core.ActionPermissionValidator`. The below code +snippet demonstrates usage of the ``ActionPermissionValidator``. + +.. code:: python + + from primaite.simulator.core import Action, ActionManager, SimComponent + from primaite.simulator.domain.controller import AccountGroup, GroupMembershipValidator + + class Smartphone(SimComponent): + name: str + apps = [] + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.action_manager = ActionManager() + + self.action_manager.add_action( + "reset_factory_settings", + Action( + func = lambda request, context: self.reset_factory_settings(), + validator = GroupMembershipValidator([AccountGroup.DOMAIN_ADMIN]), + ), + ) + + def reset_factory_settings(self): + self.apps = [] + + phone = Smartphone(name="phone1") + + # try to wipe the phone as a domain user, this will have no effect + phone.apply_action(["reset_factory_settings"], context={"request_source":{"groups":["DOMAIN_USER"]}) + + # try to wipe the phone as an admin user, this will wipe the phone + phone.apply_action(["reset_factory_settings"], context={"request_source":{"groups":["DOMAIN_ADMIN"]}) diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index 03684474..8b771cd7 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -1,22 +1,142 @@ """Core of the PrimAITE Simulator.""" -from abc import abstractmethod -from typing import Callable, Dict, List +from abc import ABC, abstractmethod +from typing import Callable, Dict, List, Optional from uuid import uuid4 -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Extra + +from primaite import getLogger + +_LOGGER = getLogger(__name__) + + +class ActionPermissionValidator(ABC): + """ + Base class for action validators. + + The permissions manager is designed to be generic. So, although in the first instance the permissions + are evaluated purely on membership to AccountGroup, this class can support validating permissions based on any + arbitrary criteria. + """ + + @abstractmethod + def __call__(self, request: List[str], context: Dict) -> bool: + """Use the request and context paramters to decide whether the action should be permitted.""" + pass + + +class AllowAllValidator(ActionPermissionValidator): + """Always allows the action.""" + + def __call__(self, request: List[str], context: Dict) -> bool: + """Always allow the action.""" + return True + + +class Action: + """ + This object stores data related to a single action. + + This includes the callable that can execute the action request, and the validator that will decide whether + the action can be performed or not. + """ + + def __init__(self, func: Callable[[List[str], Dict], None], validator: ActionPermissionValidator) -> None: + """ + Save the functions that are for this action. + + Here's a description for the intended use of both of these. + + ``func`` is a function that accepts a request and a context dict. Typically this would be a lambda function + that invokes a class method of your SimComponent. For example if the component is a node and the action is for + turning it off, then the SimComponent should have a turn_off(self) method that does not need to accept any args. + Then, this Action will be given something like ``func = lambda request, context: self.turn_off()``. + + ``validator`` is an instance of a subclass of `ActionPermissionValidator`. This is essentially a callable that + accepts `request` and `context` and returns a boolean to represent whether the permission is granted to perform + the action. + + :param func: Function that performs the request. + :type func: Callable[[List[str], Dict], None] + :param validator: Function that checks if the request is authenticated given the context. + :type validator: ActionPermissionValidator + """ + self.func: Callable[[List[str], Dict], None] = func + self.validator: ActionPermissionValidator = validator + + +class ActionManager: + """ + ActionManager is used by `SimComponent` instances to keep track of actions. + + Its main purpose is to be a lookup from action name to action function and corresponding validation function. This + class is responsible for providing a consistent API for processing actions as well as helpful error messages. + """ + + def __init__(self) -> None: + """Initialise ActionManager with an empty action lookup.""" + self.actions: Dict[str, Action] = {} + + def process_request(self, request: List[str], context: Dict) -> None: + """Process an action request. + + :param request: A list of strings which specify what action to take. The first string must be one of the allowed + actions, i.e. it must be a key of self.actions. The subsequent strings in the list are passed as parameters + to the action function. + :type request: List[str] + :param context: Dictionary of additional information necessary to process or validate the request. + :type context: Dict + :raises RuntimeError: If the request parameter does not have a valid action identifier as the first item. + """ + action_key = request[0] + + if action_key not in self.actions: + msg = ( + f"Action request {request} could not be processed because {action_key} is not a valid action", + "within this ActionManager", + ) + _LOGGER.error(msg) + raise RuntimeError(msg) + + action = self.actions[action_key] + action_options = request[1:] + + if not action.validator(action_options, context): + _LOGGER.debug(f"Action request {request} was denied due to insufficient permissions") + return + + action.func(action_options, context) + + def add_action(self, name: str, action: Action) -> None: + """Add an action to this action manager. + + :param name: The string associated to this action. + :type name: str + :param action: Action object. + :type action: Action + """ + if name in self.actions: + msg = f"Attempted to register an action but the action name {name} is already taken." + _LOGGER.error(msg) + raise RuntimeError(msg) + + self.actions[name] = action class SimComponent(BaseModel): """Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator.""" - model_config = ConfigDict(arbitrary_types_allowed=True) + model_config = ConfigDict(arbitrary_types_allowed=True, extra=Extra.allow) + """Configure pydantic to allow arbitrary types and to let the instance have attributes not present in model.""" + uuid: str - "The component UUID." + """The component UUID.""" def __init__(self, **kwargs): if not kwargs.get("uuid"): kwargs["uuid"] = str(uuid4()) super().__init__(**kwargs) + self.action_manager: Optional[ActionManager] = None @abstractmethod def describe_state(self) -> Dict: @@ -29,7 +149,7 @@ class SimComponent(BaseModel): """ return {} - def apply_action(self, action: List[str]) -> None: + def apply_action(self, action: List[str], context: Dict = {}) -> None: """ Apply an action to a simulation component. Action data is passed in as a 'namespaced' list of strings. @@ -44,16 +164,9 @@ class SimComponent(BaseModel): :param action: List describing the action to apply to this object. :type action: List[str] """ - possible_actions = self._possible_actions() - if action[0] in possible_actions: - # take the first element off the action list and pass the remaining arguments to the corresponding action - # function - possible_actions[action.pop(0)](action) - else: - raise ValueError(f"{self.__class__.__name__} received invalid action {action}") - - def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]: - return {} + if self.action_manager is None: + return + self.action_manager.process_request(action, context) def apply_timestep(self, timestep: int) -> None: """ @@ -64,7 +177,7 @@ class SimComponent(BaseModel): """ pass - def reset_component_for_episode(self, episode: int): + def reset_component_for_episode(self): """ Reset this component to its original state for a new episode. diff --git a/src/primaite/simulator/domain/__init__.py b/src/primaite/simulator/domain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/primaite/simulator/domain/account.py b/src/primaite/simulator/domain/account.py new file mode 100644 index 00000000..e8595afa --- /dev/null +++ b/src/primaite/simulator/domain/account.py @@ -0,0 +1,63 @@ +"""User account simulation.""" +from enum import Enum +from typing import Dict + +from primaite import getLogger +from primaite.simulator.core import SimComponent + +_LOGGER = getLogger(__name__) + + +class AccountType(Enum): + """Whether the account is intended for a user to log in or for a service to use.""" + + SERVICE = 1 + "Service accounts are used to grant permissions to software on nodes to perform actions" + USER = 2 + "User accounts are used to allow agents to log in and perform actions" + + +class PasswordPolicyLevel(Enum): + """Complexity requirements for account passwords.""" + + LOW = 1 + MEDIUM = 2 + HIGH = 3 + + +class Account(SimComponent): + """User accounts.""" + + num_logons: int = 0 + "The number of times this account was logged into since last reset." + num_logoffs: int = 0 + "The number of times this account was logged out of since last reset." + num_group_changes: int = 0 + "The number of times this account was moved in or out of an AccountGroup." + username: str + "Account username." + password: str + "Account password." + account_type: AccountType + "Account Type, currently this can be service account (used by apps) or user account." + enabled: bool = True + + def describe_state(self) -> Dict: + """Describe state for agent observations.""" + return super().describe_state() + + def enable(self): + """Set the status to enabled.""" + self.enabled = True + + def disable(self): + """Set the status to disabled.""" + self.enabled = False + + def log_on(self) -> None: + """TODO. Once the accounts are integrated with nodes, populate this accordingly.""" + self.num_logons += 1 + + def log_off(self) -> None: + """TODO. Once the accounts are integrated with nodes, populate this accordingly.""" + self.num_logoffs += 1 diff --git a/src/primaite/simulator/domain/controller.py b/src/primaite/simulator/domain/controller.py new file mode 100644 index 00000000..887a065d --- /dev/null +++ b/src/primaite/simulator/domain/controller.py @@ -0,0 +1,127 @@ +from enum import Enum +from typing import Dict, Final, List, Literal, Tuple + +from primaite.simulator.core import ActionPermissionValidator, SimComponent +from primaite.simulator.domain.account import Account, AccountType + + +# placeholder while these objects don't yet exist +class temp_node: + """Placeholder for node class for type hinting purposes.""" + + pass + + +class temp_application: + """Placeholder for application class for type hinting purposes.""" + + pass + + +class temp_folder: + """Placeholder for folder class for type hinting purposes.""" + + pass + + +class temp_file: + """Placeholder for file class for type hinting purposes.""" + + pass + + +class AccountGroup(Enum): + """Permissions are set at group-level and accounts can belong to these groups.""" + + LOCAL_USER = 1 + "For performing basic actions on a node" + DOMAIN_USER = 2 + "For performing basic actions to the domain" + LOCAL_ADMIN = 3 + "For full access to actions on a node" + DOMAIN_ADMIN = 4 + "For full access" + + +class GroupMembershipValidator(ActionPermissionValidator): + """Permit actions based on group membership.""" + + def __init__(self, allowed_groups: List[AccountGroup]) -> None: + """Store a list of groups that should be granted permission. + + :param allowed_groups: List of AccountGroups that are permitted to perform some action. + :type allowed_groups: List[AccountGroup] + """ + self.allowed_groups = allowed_groups + + def __call__(self, request: List[str], context: Dict) -> bool: + """Permit the action if the request comes from an account which belongs to the right group.""" + # if context request source is part of any groups mentioned in self.allow_groups, return true, otherwise false + requestor_groups: List[str] = context["request_source"]["groups"] + for allowed_group in self.allowed_groups: + if allowed_group.name in requestor_groups: + return True + return False + + +class DomainController(SimComponent): + """Main object for controlling the domain.""" + + # owned objects + accounts: Dict[str, Account] = {} + groups: Final[List[AccountGroup]] = list(AccountGroup) + + domain_group_membership: Dict[Literal[AccountGroup.DOMAIN_ADMIN, AccountGroup.DOMAIN_USER], List[Account]] = {} + local_group_membership: Dict[ + Tuple[temp_node, Literal[AccountGroup.LOCAL_ADMIN, AccountGroup.LOCAL_USER]], List[Account] + ] = {} + + # references to non-owned objects. Not sure if all are needed here. + nodes: Dict[str, temp_node] = {} + applications: Dict[str, temp_application] = {} + folders: List[temp_folder] = {} + files: List[temp_file] = {} + + def _register_account(self, account: Account) -> None: + """TODO.""" + ... + + def _deregister_account(self, account: Account) -> None: + """TODO.""" + ... + + def create_account(self, username: str, password: str, account_type: AccountType) -> Account: + """TODO.""" + ... + + def delete_account(self, account: Account) -> None: + """TODO.""" + ... + + def rotate_all_credentials(self) -> None: + """TODO.""" + ... + + def rotate_account_credentials(self, account: Account) -> None: + """TODO.""" + ... + + def add_account_to_group(self, account: Account, group: AccountGroup) -> None: + """TODO.""" + ... + + def remove_account_from_group(self, account: Account, group: AccountGroup) -> None: + """TODO.""" + ... + + def check_account_permissions(self, account: Account, node: temp_node) -> List[AccountGroup]: + """Return a list of permission groups that this account has on this node.""" + ... + + def register_node(self, node: temp_node) -> None: + """TODO.""" + ... + + def deregister_node(self, node: temp_node) -> None: + """TODO.""" + ... diff --git a/tests/integration_tests/component_creation/__init__.py b/tests/integration_tests/component_creation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration_tests/component_creation/test_permission_system.py b/tests/integration_tests/component_creation/test_permission_system.py new file mode 100644 index 00000000..6816ba84 --- /dev/null +++ b/tests/integration_tests/component_creation/test_permission_system.py @@ -0,0 +1,190 @@ +from enum import Enum +from typing import Dict, List, Literal + +import pytest + +from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent +from primaite.simulator.domain.controller import AccountGroup, GroupMembershipValidator + + +def test_group_action_validation() -> None: + """ + Check that actions are denied when an unauthorised request is made. + + This test checks the integration between SimComponent and the permissions validation system. First, we create a + basic node and folder class. We configure the node so that only admins can create a folder. Then, we try to create + a folder as both an admin user and a non-admin user. + """ + + class Folder(SimComponent): + name: str + + def describe_state(self) -> Dict: + return super().describe_state() + + class Node(SimComponent): + name: str + folders: List[Folder] = [] + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.action_manager = ActionManager() + + self.action_manager.add_action( + "create_folder", + Action( + func=lambda request, context: self.create_folder(request[0]), + validator=GroupMembershipValidator([AccountGroup.LOCAL_ADMIN, AccountGroup.DOMAIN_ADMIN]), + ), + ) + + def describe_state(self) -> Dict: + return super().describe_state() + + def create_folder(self, folder_name: str) -> None: + new_folder = Folder(uuid="0000-0000-0001", name=folder_name) + self.folders.append(new_folder) + + def remove_folder(self, folder: Folder) -> None: + self.folders = [x for x in self.folders if x is not folder] + + # check that the folder is created when a local admin tried to do it + permitted_context = {"request_source": {"agent": "BLUE", "account": "User1", "groups": ["LOCAL_ADMIN"]}} + my_node = Node(uuid="0000-0000-1234", name="pc") + my_node.apply_action(["create_folder", "memes"], context=permitted_context) + assert len(my_node.folders) == 1 + assert my_node.folders[0].name == "memes" + + # check that the number of folders is still 1 even after attempting to create a second one without permissions + invalid_context = {"request_source": {"agent": "BLUE", "account": "User1", "groups": ["LOCAL_USER", "DOMAIN_USER"]}} + my_node.apply_action(["create_folder", "memes2"], context=invalid_context) + assert len(my_node.folders) == 1 + assert my_node.folders[0].name == "memes" + + +def test_hierarchical_action_with_validation() -> None: + """ + Check that validation works with sub-objects. + + This test creates a parent object (Node) and a child object (Application) which both accept actions. The node allows + action passthrough to applications. The purpose of this test is to check that after an action is passed through to + a child object, that the permission system still works as intended. + """ + + class Application(SimComponent): + name: str + state: Literal["on", "off", "disabled"] = "off" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.action_manager = ActionManager() + + self.action_manager.add_action( + "turn_on", + Action( + func=lambda request, context: self.turn_on(), + validator=AllowAllValidator(), + ), + ) + self.action_manager.add_action( + "turn_off", + Action( + func=lambda request, context: self.turn_off(), + validator=AllowAllValidator(), + ), + ) + self.action_manager.add_action( + "disable", + Action( + func=lambda request, context: self.disable(), + validator=GroupMembershipValidator([AccountGroup.LOCAL_ADMIN, AccountGroup.DOMAIN_ADMIN]), + ), + ) + self.action_manager.add_action( + "enable", + Action( + func=lambda request, context: self.enable(), + validator=GroupMembershipValidator([AccountGroup.LOCAL_ADMIN, AccountGroup.DOMAIN_ADMIN]), + ), + ) + + def describe_state(self) -> Dict: + return super().describe_state() + + def disable(self) -> None: + self.state = "disabled" + + def enable(self) -> None: + if self.state == "disabled": + self.state = "off" + + def turn_on(self) -> None: + if self.state == "off": + self.state = "on" + + def turn_off(self) -> None: + if self.state == "on": + self.state = "off" + + class Node(SimComponent): + name: str + state: Literal["on", "off"] = "on" + apps: List[Application] = [] + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.action_manager = ActionManager() + + self.action_manager.add_action( + "apps", + Action( + func=lambda request, context: self.send_action_to_app(request.pop(0), request, context), + validator=AllowAllValidator(), + ), + ) + + def describe_state(self) -> Dict: + return super().describe_state() + + def install_app(self, app_name: str) -> None: + new_app = Application(name=app_name) + self.apps.append(new_app) + + def send_action_to_app(self, app_name: str, options: List[str], context: Dict): + for app in self.apps: + if app_name == app.name: + app.apply_action(options, context) + break + else: + msg = f"Node has no app with name {app_name}" + raise LookupError(msg) + + my_node = Node(name="pc") + my_node.install_app("Chrome") + my_node.install_app("Firefox") + + non_admin_context = { + "request_source": {"agent": "BLUE", "account": "User1", "groups": ["LOCAL_USER", "DOMAIN_USER"]} + } + + admin_context = { + "request_source": { + "agent": "BLUE", + "account": "User1", + "groups": ["LOCAL_ADMIN", "DOMAIN_ADMIN", "LOCAL_USER", "DOMAIN_USER"], + } + } + + # check that a non-admin can't disable this app + my_node.apply_action(["apps", "Chrome", "disable"], non_admin_context) + assert my_node.apps[0].name == "Chrome" # if failure occurs on this line, the test itself is broken + assert my_node.apps[0].state == "off" + + # check that a non-admin can turn this app on + my_node.apply_action(["apps", "Firefox", "turn_on"], non_admin_context) + assert my_node.apps[1].name == "Firefox" # if failure occurs on this line, the test itself is broken + assert my_node.apps[1].state == "on" + + # check that an admin can disable this app + my_node.apply_action(["apps", "Chrome", "disable"], admin_context) + assert my_node.apps[0].state == "disabled" diff --git a/tests/unit_tests/_primaite/_simulator/_domain/__init__.py b/tests/unit_tests/_primaite/_simulator/_domain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/_primaite/_simulator/_domain/test_account.py b/tests/unit_tests/_primaite/_simulator/_domain/test_account.py new file mode 100644 index 00000000..b5632ea7 --- /dev/null +++ b/tests/unit_tests/_primaite/_simulator/_domain/test_account.py @@ -0,0 +1,18 @@ +"""Test the account module of the simulator.""" +from primaite.simulator.domain.account import Account, AccountType + + +def test_account_serialise(): + """Test that an account can be serialised. If pydantic throws error then this test fails.""" + acct = Account(username="Jake", password="JakePass1!", account_type=AccountType.USER) + serialised = acct.model_dump_json() + print(serialised) + + +def test_account_deserialise(): + """Test that an account can be deserialised. The test fails if pydantic throws an error.""" + acct_json = ( + '{"uuid":"dfb2bcaa-d3a1-48fd-af3f-c943354622b4","num_logons":0,"num_logoffs":0,"num_group_changes":0,' + '"username":"Jake","password":"JakePass1!","account_type":2,"status":2,"action_manager":null}' + ) + acct = Account.model_validate_json(acct_json) diff --git a/tests/unit_tests/_primaite/_simulator/_domain/test_controller.py b/tests/unit_tests/_primaite/_simulator/_domain/test_controller.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/_primaite/_simulator/test_core.py b/tests/unit_tests/_primaite/_simulator/test_core.py index 4e2df757..0d227633 100644 --- a/tests/unit_tests/_primaite/_simulator/test_core.py +++ b/tests/unit_tests/_primaite/_simulator/test_core.py @@ -44,36 +44,3 @@ class TestIsolatedSimComponent: comp = TestComponent(name="computer", size=(5, 10)) dump = comp.model_dump_json() assert comp == TestComponent.model_validate_json(dump) - - def test_apply_action(self): - """Validate that we can override apply_action behaviour and it updates the state of the component.""" - - class TestComponent(SimComponent): - name: str - status: Literal["on", "off"] = "off" - - def describe_state(self) -> Dict: - return {} - - def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]: - return { - "turn_off": self._turn_off, - "turn_on": self._turn_on, - } - - def _turn_off(self, options: List[str]) -> None: - assert len(options) == 0, "This action does not support options." - self.status = "off" - - def _turn_on(self, options: List[str]) -> None: - assert len(options) == 0, "This action does not support options." - self.status = "on" - - comp = TestComponent(name="computer", status="off") - - assert comp.status == "off" - comp.apply_action(["turn_on"]) - assert comp.status == "on" - - with pytest.raises(ValueError): - comp.apply_action(["do_nothing"])