Merged PR 160: Permission system, account skeleton, and group skeleton
## Summary This PR implements an outline of accounts and domain controller. However, the main contribution is the permissions system and the changes to `SimComponent`. The domain skeleton will probably change after the node/folder/file/app/service classes exist. The big idea of the permissions is that the simulator itself is built in a way that permits everything, i.e. the methods of each component that make something happen don't have any permissions checking. Therefore, if you can use primaite without any agents and you will essentially have superadmin, any action you perform will go through. The permissions come into play when you try to interact with the components via the actions. Every action has a configurable permission validator attached to it that will either allow or block the action. For this reason, I've had to modify the way actions work. To keep everything neatly contained, there is a new Action class that holds a reference to both the action function itself but also to a permission validation function. ## Test process Unit tests and integration tests. I will write the design documentation for permissions and update the design for SimComponent once others are happy that the design makes sense. ## Checklist - [x] This PR is linked to a **work item** - [x] I have performed **self-review** of the code - [x] I have written **tests** for any new functionality added with this PR - [x] I have updated the **documentation** if this PR changes or adds functionality - [x] I have updated the **changelog** - [ ] I have written/updated **design docs** if this PR implements new functionality - [x] I have run **pre-commit** checks for code style Related work items: #1716
This commit is contained in:
@@ -9,5 +9,6 @@
|
||||
- [ ] I have performed **self-review** of the code
|
||||
- [ ] I have written **tests** for any new functionality added with this PR
|
||||
- [ ] I have updated the **documentation** if this PR changes or adds functionality
|
||||
- [ ] I have written/updated **design docs** if this PR implements new functionality.
|
||||
- [ ] I have written/updated **design docs** if this PR implements new functionality
|
||||
- [ ] I have update the **change log**
|
||||
- [ ] I have run **pre-commit** checks for code style
|
||||
|
||||
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Permission System - each action can define criteria that will be used to permit or deny agent actions.
|
||||
- File System - ability to emulate a node's file system during a simulation
|
||||
|
||||
## [2.0.0] - 2023-07-26
|
||||
|
||||
@@ -11,3 +11,55 @@ top level, there is an object called the ``SimulationController`` _(doesn't exis
|
||||
and a software controller for managing software and users.
|
||||
|
||||
Each node of the simulation 'tree' has responsibility for creating, deleting, and updating its direct descendants.
|
||||
|
||||
|
||||
|
||||
Actions
|
||||
=======
|
||||
Agents can interact with the simulation by using actions. Actions are standardised with the
|
||||
:py:class:`primaite.simulation.core.Action` class, which just holds a reference to two special functions.
|
||||
|
||||
1. The action function itself, it must accept a `request` parameters which is a list of strings that describe what the
|
||||
action should do. It must also accept a `context` dict which can house additional information surrounding the action.
|
||||
For example, the context will typically include information about which entity intiated the action.
|
||||
2. A validator function. This function should return a boolean value that decides if the request is permitted or not.
|
||||
It uses the same paramters as the action function.
|
||||
|
||||
Action Permissions
|
||||
------------------
|
||||
When an agent tries to perform an action on a simulation component, that action will only be executed if the request is
|
||||
validated. For example, some actions can require that an agent is logged into an admin account. Each action defines its
|
||||
own permissions using an instance of :py:class:`primaite.simulation.core.ActionPermissionValidator`. The below code
|
||||
snippet demonstrates usage of the ``ActionPermissionValidator``.
|
||||
|
||||
.. code:: python
|
||||
|
||||
from primaite.simulator.core import Action, ActionManager, SimComponent
|
||||
from primaite.simulator.domain.controller import AccountGroup, GroupMembershipValidator
|
||||
|
||||
class Smartphone(SimComponent):
|
||||
name: str
|
||||
apps = []
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.action_manager = ActionManager()
|
||||
|
||||
self.action_manager.add_action(
|
||||
"reset_factory_settings",
|
||||
Action(
|
||||
func = lambda request, context: self.reset_factory_settings(),
|
||||
validator = GroupMembershipValidator([AccountGroup.DOMAIN_ADMIN]),
|
||||
),
|
||||
)
|
||||
|
||||
def reset_factory_settings(self):
|
||||
self.apps = []
|
||||
|
||||
phone = Smartphone(name="phone1")
|
||||
|
||||
# try to wipe the phone as a domain user, this will have no effect
|
||||
phone.apply_action(["reset_factory_settings"], context={"request_source":{"groups":["DOMAIN_USER"]})
|
||||
|
||||
# try to wipe the phone as an admin user, this will wipe the phone
|
||||
phone.apply_action(["reset_factory_settings"], context={"request_source":{"groups":["DOMAIN_ADMIN"]})
|
||||
|
||||
@@ -1,14 +1,134 @@
|
||||
"""Core of the PrimAITE Simulator."""
|
||||
from abc import abstractmethod
|
||||
from typing import Callable, Dict, List
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Callable, Dict, List, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, ConfigDict, Extra
|
||||
|
||||
from primaite import getLogger
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
class ActionPermissionValidator(ABC):
|
||||
"""
|
||||
Base class for action validators.
|
||||
|
||||
The permissions manager is designed to be generic. So, although in the first instance the permissions
|
||||
are evaluated purely on membership to AccountGroup, this class can support validating permissions based on any
|
||||
arbitrary criteria.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def __call__(self, request: List[str], context: Dict) -> bool:
|
||||
"""Use the request and context paramters to decide whether the action should be permitted."""
|
||||
pass
|
||||
|
||||
|
||||
class AllowAllValidator(ActionPermissionValidator):
|
||||
"""Always allows the action."""
|
||||
|
||||
def __call__(self, request: List[str], context: Dict) -> bool:
|
||||
"""Always allow the action."""
|
||||
return True
|
||||
|
||||
|
||||
class Action:
|
||||
"""
|
||||
This object stores data related to a single action.
|
||||
|
||||
This includes the callable that can execute the action request, and the validator that will decide whether
|
||||
the action can be performed or not.
|
||||
"""
|
||||
|
||||
def __init__(self, func: Callable[[List[str], Dict], None], validator: ActionPermissionValidator) -> None:
|
||||
"""
|
||||
Save the functions that are for this action.
|
||||
|
||||
Here's a description for the intended use of both of these.
|
||||
|
||||
``func`` is a function that accepts a request and a context dict. Typically this would be a lambda function
|
||||
that invokes a class method of your SimComponent. For example if the component is a node and the action is for
|
||||
turning it off, then the SimComponent should have a turn_off(self) method that does not need to accept any args.
|
||||
Then, this Action will be given something like ``func = lambda request, context: self.turn_off()``.
|
||||
|
||||
``validator`` is an instance of a subclass of `ActionPermissionValidator`. This is essentially a callable that
|
||||
accepts `request` and `context` and returns a boolean to represent whether the permission is granted to perform
|
||||
the action.
|
||||
|
||||
:param func: Function that performs the request.
|
||||
:type func: Callable[[List[str], Dict], None]
|
||||
:param validator: Function that checks if the request is authenticated given the context.
|
||||
:type validator: ActionPermissionValidator
|
||||
"""
|
||||
self.func: Callable[[List[str], Dict], None] = func
|
||||
self.validator: ActionPermissionValidator = validator
|
||||
|
||||
|
||||
class ActionManager:
|
||||
"""
|
||||
ActionManager is used by `SimComponent` instances to keep track of actions.
|
||||
|
||||
Its main purpose is to be a lookup from action name to action function and corresponding validation function. This
|
||||
class is responsible for providing a consistent API for processing actions as well as helpful error messages.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialise ActionManager with an empty action lookup."""
|
||||
self.actions: Dict[str, Action] = {}
|
||||
|
||||
def process_request(self, request: List[str], context: Dict) -> None:
|
||||
"""Process an action request.
|
||||
|
||||
:param request: A list of strings which specify what action to take. The first string must be one of the allowed
|
||||
actions, i.e. it must be a key of self.actions. The subsequent strings in the list are passed as parameters
|
||||
to the action function.
|
||||
:type request: List[str]
|
||||
:param context: Dictionary of additional information necessary to process or validate the request.
|
||||
:type context: Dict
|
||||
:raises RuntimeError: If the request parameter does not have a valid action identifier as the first item.
|
||||
"""
|
||||
action_key = request[0]
|
||||
|
||||
if action_key not in self.actions:
|
||||
msg = (
|
||||
f"Action request {request} could not be processed because {action_key} is not a valid action",
|
||||
"within this ActionManager",
|
||||
)
|
||||
_LOGGER.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
action = self.actions[action_key]
|
||||
action_options = request[1:]
|
||||
|
||||
if not action.validator(action_options, context):
|
||||
_LOGGER.debug(f"Action request {request} was denied due to insufficient permissions")
|
||||
return
|
||||
|
||||
action.func(action_options, context)
|
||||
|
||||
def add_action(self, name: str, action: Action) -> None:
|
||||
"""Add an action to this action manager.
|
||||
|
||||
:param name: The string associated to this action.
|
||||
:type name: str
|
||||
:param action: Action object.
|
||||
:type action: Action
|
||||
"""
|
||||
if name in self.actions:
|
||||
msg = f"Attempted to register an action but the action name {name} is already taken."
|
||||
_LOGGER.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
self.actions[name] = action
|
||||
|
||||
|
||||
class SimComponent(BaseModel):
|
||||
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True, extra=Extra.allow)
|
||||
"""Configure pydantic to allow arbitrary types and to let the instance have attributes not present in model."""
|
||||
|
||||
uuid: str
|
||||
"""The component UUID."""
|
||||
|
||||
@@ -16,6 +136,7 @@ class SimComponent(BaseModel):
|
||||
if not kwargs.get("uuid"):
|
||||
kwargs["uuid"] = str(uuid4())
|
||||
super().__init__(**kwargs)
|
||||
self.action_manager: Optional[ActionManager] = None
|
||||
|
||||
@abstractmethod
|
||||
def describe_state(self) -> Dict:
|
||||
@@ -28,7 +149,7 @@ class SimComponent(BaseModel):
|
||||
"""
|
||||
return {}
|
||||
|
||||
def apply_action(self, action: List[str]) -> None:
|
||||
def apply_action(self, action: List[str], context: Dict = {}) -> None:
|
||||
"""
|
||||
Apply an action to a simulation component. Action data is passed in as a 'namespaced' list of strings.
|
||||
|
||||
@@ -43,16 +164,9 @@ class SimComponent(BaseModel):
|
||||
:param action: List describing the action to apply to this object.
|
||||
:type action: List[str]
|
||||
"""
|
||||
possible_actions = self._possible_actions()
|
||||
if action[0] in possible_actions:
|
||||
# take the first element off the action list and pass the remaining arguments to the corresponding action
|
||||
# function
|
||||
possible_actions[action.pop(0)](action)
|
||||
else:
|
||||
raise ValueError(f"{self.__class__.__name__} received invalid action {action}")
|
||||
|
||||
def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]:
|
||||
return {}
|
||||
if self.action_manager is None:
|
||||
return
|
||||
self.action_manager.process_request(action, context)
|
||||
|
||||
def apply_timestep(self, timestep: int) -> None:
|
||||
"""
|
||||
|
||||
0
src/primaite/simulator/domain/__init__.py
Normal file
0
src/primaite/simulator/domain/__init__.py
Normal file
63
src/primaite/simulator/domain/account.py
Normal file
63
src/primaite/simulator/domain/account.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""User account simulation."""
|
||||
from enum import Enum
|
||||
from typing import Dict
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.core import SimComponent
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
class AccountType(Enum):
|
||||
"""Whether the account is intended for a user to log in or for a service to use."""
|
||||
|
||||
SERVICE = 1
|
||||
"Service accounts are used to grant permissions to software on nodes to perform actions"
|
||||
USER = 2
|
||||
"User accounts are used to allow agents to log in and perform actions"
|
||||
|
||||
|
||||
class PasswordPolicyLevel(Enum):
|
||||
"""Complexity requirements for account passwords."""
|
||||
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
|
||||
|
||||
class Account(SimComponent):
|
||||
"""User accounts."""
|
||||
|
||||
num_logons: int = 0
|
||||
"The number of times this account was logged into since last reset."
|
||||
num_logoffs: int = 0
|
||||
"The number of times this account was logged out of since last reset."
|
||||
num_group_changes: int = 0
|
||||
"The number of times this account was moved in or out of an AccountGroup."
|
||||
username: str
|
||||
"Account username."
|
||||
password: str
|
||||
"Account password."
|
||||
account_type: AccountType
|
||||
"Account Type, currently this can be service account (used by apps) or user account."
|
||||
enabled: bool = True
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""Describe state for agent observations."""
|
||||
return super().describe_state()
|
||||
|
||||
def enable(self):
|
||||
"""Set the status to enabled."""
|
||||
self.enabled = True
|
||||
|
||||
def disable(self):
|
||||
"""Set the status to disabled."""
|
||||
self.enabled = False
|
||||
|
||||
def log_on(self) -> None:
|
||||
"""TODO. Once the accounts are integrated with nodes, populate this accordingly."""
|
||||
self.num_logons += 1
|
||||
|
||||
def log_off(self) -> None:
|
||||
"""TODO. Once the accounts are integrated with nodes, populate this accordingly."""
|
||||
self.num_logoffs += 1
|
||||
127
src/primaite/simulator/domain/controller.py
Normal file
127
src/primaite/simulator/domain/controller.py
Normal file
@@ -0,0 +1,127 @@
|
||||
from enum import Enum
|
||||
from typing import Dict, Final, List, Literal, Tuple
|
||||
|
||||
from primaite.simulator.core import ActionPermissionValidator, SimComponent
|
||||
from primaite.simulator.domain.account import Account, AccountType
|
||||
|
||||
|
||||
# placeholder while these objects don't yet exist
|
||||
class temp_node:
|
||||
"""Placeholder for node class for type hinting purposes."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class temp_application:
|
||||
"""Placeholder for application class for type hinting purposes."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class temp_folder:
|
||||
"""Placeholder for folder class for type hinting purposes."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class temp_file:
|
||||
"""Placeholder for file class for type hinting purposes."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class AccountGroup(Enum):
|
||||
"""Permissions are set at group-level and accounts can belong to these groups."""
|
||||
|
||||
LOCAL_USER = 1
|
||||
"For performing basic actions on a node"
|
||||
DOMAIN_USER = 2
|
||||
"For performing basic actions to the domain"
|
||||
LOCAL_ADMIN = 3
|
||||
"For full access to actions on a node"
|
||||
DOMAIN_ADMIN = 4
|
||||
"For full access"
|
||||
|
||||
|
||||
class GroupMembershipValidator(ActionPermissionValidator):
|
||||
"""Permit actions based on group membership."""
|
||||
|
||||
def __init__(self, allowed_groups: List[AccountGroup]) -> None:
|
||||
"""Store a list of groups that should be granted permission.
|
||||
|
||||
:param allowed_groups: List of AccountGroups that are permitted to perform some action.
|
||||
:type allowed_groups: List[AccountGroup]
|
||||
"""
|
||||
self.allowed_groups = allowed_groups
|
||||
|
||||
def __call__(self, request: List[str], context: Dict) -> bool:
|
||||
"""Permit the action if the request comes from an account which belongs to the right group."""
|
||||
# if context request source is part of any groups mentioned in self.allow_groups, return true, otherwise false
|
||||
requestor_groups: List[str] = context["request_source"]["groups"]
|
||||
for allowed_group in self.allowed_groups:
|
||||
if allowed_group.name in requestor_groups:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class DomainController(SimComponent):
|
||||
"""Main object for controlling the domain."""
|
||||
|
||||
# owned objects
|
||||
accounts: Dict[str, Account] = {}
|
||||
groups: Final[List[AccountGroup]] = list(AccountGroup)
|
||||
|
||||
domain_group_membership: Dict[Literal[AccountGroup.DOMAIN_ADMIN, AccountGroup.DOMAIN_USER], List[Account]] = {}
|
||||
local_group_membership: Dict[
|
||||
Tuple[temp_node, Literal[AccountGroup.LOCAL_ADMIN, AccountGroup.LOCAL_USER]], List[Account]
|
||||
] = {}
|
||||
|
||||
# references to non-owned objects. Not sure if all are needed here.
|
||||
nodes: Dict[str, temp_node] = {}
|
||||
applications: Dict[str, temp_application] = {}
|
||||
folders: List[temp_folder] = {}
|
||||
files: List[temp_file] = {}
|
||||
|
||||
def _register_account(self, account: Account) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def _deregister_account(self, account: Account) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def create_account(self, username: str, password: str, account_type: AccountType) -> Account:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def delete_account(self, account: Account) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def rotate_all_credentials(self) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def rotate_account_credentials(self, account: Account) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def add_account_to_group(self, account: Account, group: AccountGroup) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def remove_account_from_group(self, account: Account, group: AccountGroup) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def check_account_permissions(self, account: Account, node: temp_node) -> List[AccountGroup]:
|
||||
"""Return a list of permission groups that this account has on this node."""
|
||||
...
|
||||
|
||||
def register_node(self, node: temp_node) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
|
||||
def deregister_node(self, node: temp_node) -> None:
|
||||
"""TODO."""
|
||||
...
|
||||
@@ -0,0 +1,190 @@
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Literal
|
||||
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
|
||||
from primaite.simulator.domain.controller import AccountGroup, GroupMembershipValidator
|
||||
|
||||
|
||||
def test_group_action_validation() -> None:
|
||||
"""
|
||||
Check that actions are denied when an unauthorised request is made.
|
||||
|
||||
This test checks the integration between SimComponent and the permissions validation system. First, we create a
|
||||
basic node and folder class. We configure the node so that only admins can create a folder. Then, we try to create
|
||||
a folder as both an admin user and a non-admin user.
|
||||
"""
|
||||
|
||||
class Folder(SimComponent):
|
||||
name: str
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return super().describe_state()
|
||||
|
||||
class Node(SimComponent):
|
||||
name: str
|
||||
folders: List[Folder] = []
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.action_manager = ActionManager()
|
||||
|
||||
self.action_manager.add_action(
|
||||
"create_folder",
|
||||
Action(
|
||||
func=lambda request, context: self.create_folder(request[0]),
|
||||
validator=GroupMembershipValidator([AccountGroup.LOCAL_ADMIN, AccountGroup.DOMAIN_ADMIN]),
|
||||
),
|
||||
)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return super().describe_state()
|
||||
|
||||
def create_folder(self, folder_name: str) -> None:
|
||||
new_folder = Folder(uuid="0000-0000-0001", name=folder_name)
|
||||
self.folders.append(new_folder)
|
||||
|
||||
def remove_folder(self, folder: Folder) -> None:
|
||||
self.folders = [x for x in self.folders if x is not folder]
|
||||
|
||||
# check that the folder is created when a local admin tried to do it
|
||||
permitted_context = {"request_source": {"agent": "BLUE", "account": "User1", "groups": ["LOCAL_ADMIN"]}}
|
||||
my_node = Node(uuid="0000-0000-1234", name="pc")
|
||||
my_node.apply_action(["create_folder", "memes"], context=permitted_context)
|
||||
assert len(my_node.folders) == 1
|
||||
assert my_node.folders[0].name == "memes"
|
||||
|
||||
# check that the number of folders is still 1 even after attempting to create a second one without permissions
|
||||
invalid_context = {"request_source": {"agent": "BLUE", "account": "User1", "groups": ["LOCAL_USER", "DOMAIN_USER"]}}
|
||||
my_node.apply_action(["create_folder", "memes2"], context=invalid_context)
|
||||
assert len(my_node.folders) == 1
|
||||
assert my_node.folders[0].name == "memes"
|
||||
|
||||
|
||||
def test_hierarchical_action_with_validation() -> None:
|
||||
"""
|
||||
Check that validation works with sub-objects.
|
||||
|
||||
This test creates a parent object (Node) and a child object (Application) which both accept actions. The node allows
|
||||
action passthrough to applications. The purpose of this test is to check that after an action is passed through to
|
||||
a child object, that the permission system still works as intended.
|
||||
"""
|
||||
|
||||
class Application(SimComponent):
|
||||
name: str
|
||||
state: Literal["on", "off", "disabled"] = "off"
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.action_manager = ActionManager()
|
||||
|
||||
self.action_manager.add_action(
|
||||
"turn_on",
|
||||
Action(
|
||||
func=lambda request, context: self.turn_on(),
|
||||
validator=AllowAllValidator(),
|
||||
),
|
||||
)
|
||||
self.action_manager.add_action(
|
||||
"turn_off",
|
||||
Action(
|
||||
func=lambda request, context: self.turn_off(),
|
||||
validator=AllowAllValidator(),
|
||||
),
|
||||
)
|
||||
self.action_manager.add_action(
|
||||
"disable",
|
||||
Action(
|
||||
func=lambda request, context: self.disable(),
|
||||
validator=GroupMembershipValidator([AccountGroup.LOCAL_ADMIN, AccountGroup.DOMAIN_ADMIN]),
|
||||
),
|
||||
)
|
||||
self.action_manager.add_action(
|
||||
"enable",
|
||||
Action(
|
||||
func=lambda request, context: self.enable(),
|
||||
validator=GroupMembershipValidator([AccountGroup.LOCAL_ADMIN, AccountGroup.DOMAIN_ADMIN]),
|
||||
),
|
||||
)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return super().describe_state()
|
||||
|
||||
def disable(self) -> None:
|
||||
self.state = "disabled"
|
||||
|
||||
def enable(self) -> None:
|
||||
if self.state == "disabled":
|
||||
self.state = "off"
|
||||
|
||||
def turn_on(self) -> None:
|
||||
if self.state == "off":
|
||||
self.state = "on"
|
||||
|
||||
def turn_off(self) -> None:
|
||||
if self.state == "on":
|
||||
self.state = "off"
|
||||
|
||||
class Node(SimComponent):
|
||||
name: str
|
||||
state: Literal["on", "off"] = "on"
|
||||
apps: List[Application] = []
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.action_manager = ActionManager()
|
||||
|
||||
self.action_manager.add_action(
|
||||
"apps",
|
||||
Action(
|
||||
func=lambda request, context: self.send_action_to_app(request.pop(0), request, context),
|
||||
validator=AllowAllValidator(),
|
||||
),
|
||||
)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return super().describe_state()
|
||||
|
||||
def install_app(self, app_name: str) -> None:
|
||||
new_app = Application(name=app_name)
|
||||
self.apps.append(new_app)
|
||||
|
||||
def send_action_to_app(self, app_name: str, options: List[str], context: Dict):
|
||||
for app in self.apps:
|
||||
if app_name == app.name:
|
||||
app.apply_action(options, context)
|
||||
break
|
||||
else:
|
||||
msg = f"Node has no app with name {app_name}"
|
||||
raise LookupError(msg)
|
||||
|
||||
my_node = Node(name="pc")
|
||||
my_node.install_app("Chrome")
|
||||
my_node.install_app("Firefox")
|
||||
|
||||
non_admin_context = {
|
||||
"request_source": {"agent": "BLUE", "account": "User1", "groups": ["LOCAL_USER", "DOMAIN_USER"]}
|
||||
}
|
||||
|
||||
admin_context = {
|
||||
"request_source": {
|
||||
"agent": "BLUE",
|
||||
"account": "User1",
|
||||
"groups": ["LOCAL_ADMIN", "DOMAIN_ADMIN", "LOCAL_USER", "DOMAIN_USER"],
|
||||
}
|
||||
}
|
||||
|
||||
# check that a non-admin can't disable this app
|
||||
my_node.apply_action(["apps", "Chrome", "disable"], non_admin_context)
|
||||
assert my_node.apps[0].name == "Chrome" # if failure occurs on this line, the test itself is broken
|
||||
assert my_node.apps[0].state == "off"
|
||||
|
||||
# check that a non-admin can turn this app on
|
||||
my_node.apply_action(["apps", "Firefox", "turn_on"], non_admin_context)
|
||||
assert my_node.apps[1].name == "Firefox" # if failure occurs on this line, the test itself is broken
|
||||
assert my_node.apps[1].state == "on"
|
||||
|
||||
# check that an admin can disable this app
|
||||
my_node.apply_action(["apps", "Chrome", "disable"], admin_context)
|
||||
assert my_node.apps[0].state == "disabled"
|
||||
@@ -0,0 +1,18 @@
|
||||
"""Test the account module of the simulator."""
|
||||
from primaite.simulator.domain.account import Account, AccountType
|
||||
|
||||
|
||||
def test_account_serialise():
|
||||
"""Test that an account can be serialised. If pydantic throws error then this test fails."""
|
||||
acct = Account(username="Jake", password="JakePass1!", account_type=AccountType.USER)
|
||||
serialised = acct.model_dump_json()
|
||||
print(serialised)
|
||||
|
||||
|
||||
def test_account_deserialise():
|
||||
"""Test that an account can be deserialised. The test fails if pydantic throws an error."""
|
||||
acct_json = (
|
||||
'{"uuid":"dfb2bcaa-d3a1-48fd-af3f-c943354622b4","num_logons":0,"num_logoffs":0,"num_group_changes":0,'
|
||||
'"username":"Jake","password":"JakePass1!","account_type":2,"status":2,"action_manager":null}'
|
||||
)
|
||||
acct = Account.model_validate_json(acct_json)
|
||||
@@ -44,36 +44,3 @@ class TestIsolatedSimComponent:
|
||||
comp = TestComponent(name="computer", size=(5, 10))
|
||||
dump = comp.model_dump_json()
|
||||
assert comp == TestComponent.model_validate_json(dump)
|
||||
|
||||
def test_apply_action(self):
|
||||
"""Validate that we can override apply_action behaviour and it updates the state of the component."""
|
||||
|
||||
class TestComponent(SimComponent):
|
||||
name: str
|
||||
status: Literal["on", "off"] = "off"
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
return {}
|
||||
|
||||
def _possible_actions(self) -> Dict[str, Callable[[List[str]], None]]:
|
||||
return {
|
||||
"turn_off": self._turn_off,
|
||||
"turn_on": self._turn_on,
|
||||
}
|
||||
|
||||
def _turn_off(self, options: List[str]) -> None:
|
||||
assert len(options) == 0, "This action does not support options."
|
||||
self.status = "off"
|
||||
|
||||
def _turn_on(self, options: List[str]) -> None:
|
||||
assert len(options) == 0, "This action does not support options."
|
||||
self.status = "on"
|
||||
|
||||
comp = TestComponent(name="computer", status="off")
|
||||
|
||||
assert comp.status == "off"
|
||||
comp.apply_action(["turn_on"])
|
||||
assert comp.status == "on"
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
comp.apply_action(["do_nothing"])
|
||||
|
||||
Reference in New Issue
Block a user