diff --git a/src/primaite/simulator/core.py b/src/primaite/simulator/core.py index c3130116..eaedc85a 100644 --- a/src/primaite/simulator/core.py +++ b/src/primaite/simulator/core.py @@ -1,13 +1,123 @@ """Core of the PrimAITE Simulator.""" -from abc import abstractmethod -from typing import Callable, Dict, List +from abc import ABC, abstractmethod +from typing import Callable, Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict + +from primaite import getLogger +from primaite.simulator.domain import AccountGroup + +_LOGGER = getLogger(__name__) + + +class ActionPermissionValidator(ABC): + """ + Base class for action validators. + + The permissions manager is designed to be generic. So, although in the first instance the permissions + are evaluated purely on membership to AccountGroup, this class can support validating permissions based on any + arbitrary criteria. + """ + + @abstractmethod + def __call__(self, request: List[str], context: Dict) -> bool: + """TODO.""" + pass + + +class AllowAllValidator(ActionPermissionValidator): + """Always allows the action.""" + + def __call__(self, request: List[str], context: Dict) -> bool: + """Always allow the action.""" + return True + + +class GroupMembershipValidator(ActionPermissionValidator): + """Permit actions based on group membership.""" + + def __init__(self, allowed_groups: List[AccountGroup]) -> None: + """TODO.""" + self.allowed_groups = allowed_groups + + def __call__(self, request: List[str], context: Dict) -> bool: + """Permit the action if the request comes from an account which belongs to the right group.""" + # if context request source is part of any groups mentioned in self.allow_groups, return true, otherwise false + requestor_groups: List[str] = context["request_source"]["groups"] + for allowed_group in self.allowed_groups: + if allowed_group.name in requestor_groups: + return True + return False + + +class Action: + """ + This object stores data related to a single action. + + This includes the callable that can execute the action request, and the validator that will decide whether + the action can be performed or not. + """ + + def __init__(self, func: Callable[[List[str], Dict], None], validator: ActionPermissionValidator) -> None: + """ + Save the functions that are for this action. + + Here's a description for the intended use of both of these. + + ``func`` is a function that accepts a request and a context dict. Typically this would be a lambda function + that invokes a class method of your SimComponent. For example if the component is a node and the action is for + turning it off, then the SimComponent should have a turn_off(self) method that does not need to accept any args. + Then, this Action will be given something like ``func = lambda request, context: self.turn_off()``. + + :param func: Function that performs the request. + :type func: Callable[[List[str], Dict], None] + :param validator: Function that checks if the request is authenticated given the context. + :type validator: ActionPermissionValidator + """ + self.func: Callable[[List[str], Dict], None] = func + self.validator: ActionPermissionValidator = validator + + +class ActionManager: + """TODO.""" + + def __init__(self) -> None: + """TODO.""" + self.actions: Dict[str, Action] + + def process_request(self, request: List[str], context: Dict) -> None: + """Process action request.""" + action_key = request[0] + + if action_key not in self.actions: + msg = ( + f"Action request {request} could not be processed because {action_key} is not a valid action", + "within this ActionManager", + ) + _LOGGER.error(msg) + raise RuntimeError(msg) + + action = self.actions[action_key] + action_options = request[1:] + + if not action.validator(action_options, context): + _LOGGER.debug(f"Action request {request} was denied due to insufficient permissions") + return + + action.func(action_options, context) class SimComponent(BaseModel): """Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator.""" + model_config = ConfigDict(arbitrary_types_allowed=True) + uuid: str + "The component UUID." + + def __init__(self, **kwargs) -> None: + self.action_manager: Optional[ActionManager] = None + super().__init__(**kwargs) + @abstractmethod def describe_state(self) -> Dict: """ @@ -19,7 +129,7 @@ class SimComponent(BaseModel): """ return {} - def apply_action(self, action: List[str]) -> None: + def apply_action(self, action: List[str], context: Dict = {}) -> None: """ Apply an action to a simulation component. Action data is passed in as a 'namespaced' list of strings. @@ -34,16 +144,9 @@ class SimComponent(BaseModel): :param action: List describing the action to apply to this object. :type action: List[str] """ - possible_actions = self._possible_actions() - if action[0] in possible_actions: - # take the first element off the action list and pass the remaining arguments to the corresponding action - # function - possible_actions[action.pop(0)](action) - else: - raise ValueError(f"{self.__class__.__name__} received invalid action {action}") - - def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]: - return {} + if self.action_manager is None: + return + self.action_manager.process_request(action, context) def apply_timestep(self, timestep: int) -> None: """ diff --git a/src/primaite/simulator/domain/__init__.py b/src/primaite/simulator/domain/__init__.py index 6f59cf49..0e23133f 100644 --- a/src/primaite/simulator/domain/__init__.py +++ b/src/primaite/simulator/domain/__init__.py @@ -1,3 +1,3 @@ -from primaite.simulator.domain.account import Account +from primaite.simulator.domain.account import Account, AccountGroup, AccountType -__all__ = ["Account"] +__all__ = ["Account", "AccountGroup", "AccountType"] diff --git a/src/primaite/simulator/domain/account.py b/src/primaite/simulator/domain/account.py index 374675a0..c134e916 100644 --- a/src/primaite/simulator/domain/account.py +++ b/src/primaite/simulator/domain/account.py @@ -1,6 +1,6 @@ """User account simulation.""" from enum import Enum -from typing import Dict, List, Set, TypeAlias +from typing import Callable, Dict, List, TypeAlias from primaite import getLogger from primaite.simulator.core import SimComponent @@ -40,6 +40,14 @@ class AccountStatus(Enum): disabled = 2 +class PasswordPolicyLevel(Enum): + """Complexity requirements for account passwords.""" + + low = 1 + medium = 2 + high = 3 + + class Account(SimComponent): """User accounts.""" @@ -55,38 +63,18 @@ class Account(SimComponent): "Account password." account_type: AccountType "Account Type, currently this can be service account (used by apps) or user account." - domain_groups: Set[AccountGroup] = [] - "Domain-wide groups that this account belongs to." - local_groups: Dict[__temp_node, List[AccountGroup]] - "For each node, whether this account has local/admin privileges on that node." status: AccountStatus = AccountStatus.disabled - def add_to_domain_group(self, group: AccountGroup) -> None: - """ - Add this account to a domain group. - - If the account is already a member of this group, nothing happens. - - :param group: The group to which to add this account. - :type group: AccountGroup - """ - self.domain_groups.add(group) - - def remove_from_domain_group(self, group: AccountGroup) -> None: - """ - Remove this account from a domain group. - - If the account is already not a member of that group, nothing happens. - - :param group: The group from which this account should be removed. - :type group: AccountGroup - """ - self.domain_groups.discard(group) - - def enable_account(self): + def enable(self): """Set the status to enabled.""" self.status = AccountStatus.enabled - def disable_account(self): + def disable(self): """Set the status to disabled.""" self.status = AccountStatus.disabled + + def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]: + return { + "enable": self.enable, + "disable": self.disable, + } diff --git a/src/primaite/simulator/domain/controller.py b/src/primaite/simulator/domain/controller.py index 5a14e80e..c9165bbf 100644 --- a/src/primaite/simulator/domain/controller.py +++ b/src/primaite/simulator/domain/controller.py @@ -1,13 +1,54 @@ -from typing import Set, TypeAlias +from typing import Dict, Final, List, TypeAlias from primaite.simulator.core import SimComponent -from primaite.simulator.domain import Account +from primaite.simulator.domain import Account, AccountGroup, AccountType -__temp_node = TypeAlias() # placeholder while nodes don't exist +# placeholder while these objects don't yet exist +__temp_node = TypeAlias() +__temp_application = TypeAlias() +__temp_folder = TypeAlias() +__temp_file = TypeAlias() class DomainController(SimComponent): """Main object for controlling the domain.""" - nodes: Set(__temp_node) = set() - accounts: Set(Account) = set() + # owned objects + accounts: List(Account) = [] + groups: Final[List[AccountGroup]] = list(AccountGroup) + + group_membership: Dict[AccountGroup, List[Account]] + + # references to non-owned objects + nodes: List(__temp_node) = [] + applications: List(__temp_application) = [] + folders: List(__temp_folder) = [] + files: List(__temp_file) = [] + + def register_account(self, account: Account) -> None: + """TODO.""" + ... + + def deregister_account(self, account: Account) -> None: + """TODO.""" + ... + + def create_account(self, username: str, password: str, account_type: AccountType) -> Account: + """TODO.""" + ... + + def rotate_all_credentials(self) -> None: + """TODO.""" + ... + + def rotate_account_credentials(self, account: Account) -> None: + """TODO.""" + ... + + def add_account_to_group(self, account: Account, group: AccountGroup) -> None: + """TODO.""" + ... + + def remove_account_from_group(self, account: Account, group: AccountGroup) -> None: + """TODO.""" + ...