From e809d89c30d3ba438d4edabfe88ea9c1ba9f226d Mon Sep 17 00:00:00 2001 From: Czar Echavez Date: Thu, 5 Sep 2024 13:47:59 +0100 Subject: [PATCH] #2842 and #2843: implement add user and disable user actions + tests --- src/primaite/game/agent/actions.py | 34 +++++++ .../simulator/network/hardware/base.py | 16 +++- tests/conftest.py | 2 + .../actions/test_user_account_actions.py | 93 +++++++++++++++++++ 4 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 tests/integration_tests/game_layer/actions/test_user_account_actions.py diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index 2e6189c0..a299788e 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -1116,6 +1116,38 @@ class ConfigureC2BeaconAction(AbstractAction): return ["network", "node", node_name, "application", "C2Beacon", "configure", config.__dict__] +class NodeAccountsAddUserAction(AbstractAction): + """Action which changes adds a User.""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request(self, node_id: str, username: str, password: str, is_admin: bool) -> RequestFormat: + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + node_name = self.manager.get_node_name_by_idx(node_id) + return ["network", "node", node_name, "service", "UserManager", "add_user", username, password, is_admin] + + +class NodeAccountsDisableUserAction(AbstractAction): + """Action which disables a user.""" + + def __init__(self, manager: "ActionManager", **kwargs) -> None: + super().__init__(manager=manager) + + def form_request(self, node_id: str, username: str) -> RequestFormat: + """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" + node_name = self.manager.get_node_name_by_idx(node_id) + return [ + "network", + "node", + node_name, + "service", + "UserManager", + "disable_user", + username, + ] + + class NodeAccountsChangePasswordAction(AbstractAction): """Action which changes the password for a user.""" @@ -1368,6 +1400,8 @@ class ActionManager: "C2_SERVER_RANSOMWARE_CONFIGURE": RansomwareConfigureC2ServerAction, "C2_SERVER_TERMINAL_COMMAND": TerminalC2ServerAction, "C2_SERVER_DATA_EXFILTRATE": ExfiltrationC2ServerAction, + "NODE_ACCOUNTS_ADD_USER": NodeAccountsAddUserAction, + "NODE_ACCOUNTS_DISABLE_USER": NodeAccountsDisableUserAction, "NODE_ACCOUNTS_CHANGE_PASSWORD": NodeAccountsChangePasswordAction, "SSH_TO_REMOTE": NodeSessionsRemoteLoginAction, "SESSIONS_REMOTE_LOGOFF": NodeSessionsRemoteLogoutAction, diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index ef2d47c3..f49d0a17 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -857,7 +857,21 @@ class UserManager(Service): """ rm = super()._init_request_manager() - # todo add doc about requeest schemas + # todo add doc about request schemas + rm.add_request( + "add_user", + RequestType( + func=lambda request, context: RequestResponse.from_bool( + self.add_user(username=request[0], password=request[1], is_admin=request[2]) + ) + ), + ) + rm.add_request( + "disable_user", + RequestType( + func=lambda request, context: RequestResponse.from_bool(self.disable_user(username=request[0])) + ), + ) rm.add_request( "change_password", RequestType( diff --git a/tests/conftest.py b/tests/conftest.py index 1bbff8f2..50877378 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -463,6 +463,8 @@ def game_and_agent(): {"type": "C2_SERVER_RANSOMWARE_CONFIGURE"}, {"type": "C2_SERVER_TERMINAL_COMMAND"}, {"type": "C2_SERVER_DATA_EXFILTRATE"}, + {"type": "NODE_ACCOUNTS_ADD_USER"}, + {"type": "NODE_ACCOUNTS_DISABLE_USER"}, {"type": "NODE_ACCOUNTS_CHANGE_PASSWORD"}, {"type": "SSH_TO_REMOTE"}, {"type": "SESSIONS_REMOTE_LOGOFF"}, diff --git a/tests/integration_tests/game_layer/actions/test_user_account_actions.py b/tests/integration_tests/game_layer/actions/test_user_account_actions.py new file mode 100644 index 00000000..fd720315 --- /dev/null +++ b/tests/integration_tests/game_layer/actions/test_user_account_actions.py @@ -0,0 +1,93 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +import pytest + +from primaite.simulator.network.hardware.nodes.host.computer import Computer + + +@pytest.fixture +def game_and_agent_fixture(game_and_agent): + """Create a game with a simple agent that can be controlled by the tests.""" + game, agent = game_and_agent + + client_1: Computer = game.simulation.network.get_node_by_hostname("client_1") + client_1.start_up_duration = 3 + + return (game, agent) + + +def test_user_account_add_user_action(game_and_agent_fixture): + """Tests the add user account action.""" + game, agent = game_and_agent_fixture + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + assert len(client_1.user_manager.users) == 1 # admin is created by default + assert len(client_1.user_manager.admins) == 1 + + # add admin account + action = ( + "NODE_ACCOUNTS_ADD_USER", + {"node_id": 0, "username": "soccon_diiz", "password": "nuts", "is_admin": True}, + ) + agent.store_action(action) + game.step() + + assert len(client_1.user_manager.users) == 2 # new user added + assert len(client_1.user_manager.admins) == 2 + + # add non admin account + action = ( + "NODE_ACCOUNTS_ADD_USER", + {"node_id": 0, "username": "mike_rotch", "password": "password", "is_admin": False}, + ) + agent.store_action(action) + game.step() + + assert len(client_1.user_manager.users) == 3 # new user added + assert len(client_1.user_manager.admins) == 2 + + +def test_user_account_disable_user_action(game_and_agent_fixture): + """Tests the disable user account action.""" + game, agent = game_and_agent_fixture + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + client_1.user_manager.add_user(username="test", password="icles", is_admin=True) + assert len(client_1.user_manager.users) == 2 # new user added + assert len(client_1.user_manager.admins) == 2 + + test_user = client_1.user_manager.users.get("test") + assert test_user + assert test_user.disabled is not True + + # disable test account + action = ( + "NODE_ACCOUNTS_DISABLE_USER", + { + "node_id": 0, + "username": "test", + }, + ) + agent.store_action(action) + game.step() + assert test_user.disabled + + +def test_user_account_change_password_action(game_and_agent_fixture): + """Tests the change password user account action.""" + game, agent = game_and_agent_fixture + client_1 = game.simulation.network.get_node_by_hostname("client_1") + + client_1.user_manager.add_user(username="test", password="icles", is_admin=True) + + test_user = client_1.user_manager.users.get("test") + assert test_user.password == "icles" + + # change account password + action = ( + "NODE_ACCOUNTS_CHANGE_PASSWORD", + {"node_id": 0, "username": "test", "current_password": "icles", "new_password": "2Hard_2_Hack"}, + ) + agent.store_action(action) + game.step() + + assert test_user.password == "2Hard_2_Hack"