Overhaul sim component for permission management.

This commit is contained in:
Marek Wolan
2023-08-03 13:09:04 +01:00
parent 091b4a801d
commit 3a2840bed8
4 changed files with 182 additions and 50 deletions

View File

@@ -1,13 +1,123 @@
"""Core of the PrimAITE Simulator."""
from abc import abstractmethod
from typing import Callable, Dict, List
from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
from primaite import getLogger
from primaite.simulator.domain import AccountGroup
_LOGGER = getLogger(__name__)
class ActionPermissionValidator(ABC):
"""
Base class for action validators.
The permissions manager is designed to be generic. So, although in the first instance the permissions
are evaluated purely on membership to AccountGroup, this class can support validating permissions based on any
arbitrary criteria.
"""
@abstractmethod
def __call__(self, request: List[str], context: Dict) -> bool:
"""TODO."""
pass
class AllowAllValidator(ActionPermissionValidator):
"""Always allows the action."""
def __call__(self, request: List[str], context: Dict) -> bool:
"""Always allow the action."""
return True
class GroupMembershipValidator(ActionPermissionValidator):
"""Permit actions based on group membership."""
def __init__(self, allowed_groups: List[AccountGroup]) -> None:
"""TODO."""
self.allowed_groups = allowed_groups
def __call__(self, request: List[str], context: Dict) -> bool:
"""Permit the action if the request comes from an account which belongs to the right group."""
# if context request source is part of any groups mentioned in self.allow_groups, return true, otherwise false
requestor_groups: List[str] = context["request_source"]["groups"]
for allowed_group in self.allowed_groups:
if allowed_group.name in requestor_groups:
return True
return False
class Action:
"""
This object stores data related to a single action.
This includes the callable that can execute the action request, and the validator that will decide whether
the action can be performed or not.
"""
def __init__(self, func: Callable[[List[str], Dict], None], validator: ActionPermissionValidator) -> None:
"""
Save the functions that are for this action.
Here's a description for the intended use of both of these.
``func`` is a function that accepts a request and a context dict. Typically this would be a lambda function
that invokes a class method of your SimComponent. For example if the component is a node and the action is for
turning it off, then the SimComponent should have a turn_off(self) method that does not need to accept any args.
Then, this Action will be given something like ``func = lambda request, context: self.turn_off()``.
:param func: Function that performs the request.
:type func: Callable[[List[str], Dict], None]
:param validator: Function that checks if the request is authenticated given the context.
:type validator: ActionPermissionValidator
"""
self.func: Callable[[List[str], Dict], None] = func
self.validator: ActionPermissionValidator = validator
class ActionManager:
"""TODO."""
def __init__(self) -> None:
"""TODO."""
self.actions: Dict[str, Action]
def process_request(self, request: List[str], context: Dict) -> None:
"""Process action request."""
action_key = request[0]
if action_key not in self.actions:
msg = (
f"Action request {request} could not be processed because {action_key} is not a valid action",
"within this ActionManager",
)
_LOGGER.error(msg)
raise RuntimeError(msg)
action = self.actions[action_key]
action_options = request[1:]
if not action.validator(action_options, context):
_LOGGER.debug(f"Action request {request} was denied due to insufficient permissions")
return
action.func(action_options, context)
class SimComponent(BaseModel):
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""
model_config = ConfigDict(arbitrary_types_allowed=True)
uuid: str
"The component UUID."
def __init__(self, **kwargs) -> None:
self.action_manager: Optional[ActionManager] = None
super().__init__(**kwargs)
@abstractmethod
def describe_state(self) -> Dict:
"""
@@ -19,7 +129,7 @@ class SimComponent(BaseModel):
"""
return {}
def apply_action(self, action: List[str]) -> None:
def apply_action(self, action: List[str], context: Dict = {}) -> None:
"""
Apply an action to a simulation component. Action data is passed in as a 'namespaced' list of strings.
@@ -34,16 +144,9 @@ class SimComponent(BaseModel):
:param action: List describing the action to apply to this object.
:type action: List[str]
"""
possible_actions = self._possible_actions()
if action[0] in possible_actions:
# take the first element off the action list and pass the remaining arguments to the corresponding action
# function
possible_actions[action.pop(0)](action)
else:
raise ValueError(f"{self.__class__.__name__} received invalid action {action}")
def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]:
return {}
if self.action_manager is None:
return
self.action_manager.process_request(action, context)
def apply_timestep(self, timestep: int) -> None:
"""

View File

@@ -1,3 +1,3 @@
from primaite.simulator.domain.account import Account
from primaite.simulator.domain.account import Account, AccountGroup, AccountType
__all__ = ["Account"]
__all__ = ["Account", "AccountGroup", "AccountType"]

View File

@@ -1,6 +1,6 @@
"""User account simulation."""
from enum import Enum
from typing import Dict, List, Set, TypeAlias
from typing import Callable, Dict, List, TypeAlias
from primaite import getLogger
from primaite.simulator.core import SimComponent
@@ -40,6 +40,14 @@ class AccountStatus(Enum):
disabled = 2
class PasswordPolicyLevel(Enum):
"""Complexity requirements for account passwords."""
low = 1
medium = 2
high = 3
class Account(SimComponent):
"""User accounts."""
@@ -55,38 +63,18 @@ class Account(SimComponent):
"Account password."
account_type: AccountType
"Account Type, currently this can be service account (used by apps) or user account."
domain_groups: Set[AccountGroup] = []
"Domain-wide groups that this account belongs to."
local_groups: Dict[__temp_node, List[AccountGroup]]
"For each node, whether this account has local/admin privileges on that node."
status: AccountStatus = AccountStatus.disabled
def add_to_domain_group(self, group: AccountGroup) -> None:
"""
Add this account to a domain group.
If the account is already a member of this group, nothing happens.
:param group: The group to which to add this account.
:type group: AccountGroup
"""
self.domain_groups.add(group)
def remove_from_domain_group(self, group: AccountGroup) -> None:
"""
Remove this account from a domain group.
If the account is already not a member of that group, nothing happens.
:param group: The group from which this account should be removed.
:type group: AccountGroup
"""
self.domain_groups.discard(group)
def enable_account(self):
def enable(self):
"""Set the status to enabled."""
self.status = AccountStatus.enabled
def disable_account(self):
def disable(self):
"""Set the status to disabled."""
self.status = AccountStatus.disabled
def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]:
return {
"enable": self.enable,
"disable": self.disable,
}

View File

@@ -1,13 +1,54 @@
from typing import Set, TypeAlias
from typing import Dict, Final, List, TypeAlias
from primaite.simulator.core import SimComponent
from primaite.simulator.domain import Account
from primaite.simulator.domain import Account, AccountGroup, AccountType
__temp_node = TypeAlias() # placeholder while nodes don't exist
# placeholder while these objects don't yet exist
__temp_node = TypeAlias()
__temp_application = TypeAlias()
__temp_folder = TypeAlias()
__temp_file = TypeAlias()
class DomainController(SimComponent):
"""Main object for controlling the domain."""
nodes: Set(__temp_node) = set()
accounts: Set(Account) = set()
# owned objects
accounts: List(Account) = []
groups: Final[List[AccountGroup]] = list(AccountGroup)
group_membership: Dict[AccountGroup, List[Account]]
# references to non-owned objects
nodes: List(__temp_node) = []
applications: List(__temp_application) = []
folders: List(__temp_folder) = []
files: List(__temp_file) = []
def register_account(self, account: Account) -> None:
"""TODO."""
...
def deregister_account(self, account: Account) -> None:
"""TODO."""
...
def create_account(self, username: str, password: str, account_type: AccountType) -> Account:
"""TODO."""
...
def rotate_all_credentials(self) -> None:
"""TODO."""
...
def rotate_account_credentials(self, account: Account) -> None:
"""TODO."""
...
def add_account_to_group(self, account: Account, group: AccountGroup) -> None:
"""TODO."""
...
def remove_account_from_group(self, account: Account, group: AccountGroup) -> None:
"""TODO."""
...