## Summary Added actions which allow a user to be added and disabled ## Test process https://dev.azure.com/ma-dev-uk/PrimAITE/_git/PrimAITE/pullrequest/529?_a=files&path=/tests/integration_tests/game_layer/actions/test_user_account_actions.py ## Checklist - [X] PR is linked to a **work item** - [X] **acceptance criteria** of linked ticket are met - [X] performed **self-review** of the code - [X] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [ ] updated the **change log** - [X] ran **pre-commit** checks for code style - [ ] attended to any **TO-DOs** left in the code #2842 and #2843: implement add user and disable user actions + tests Related work items: #2842, #2843
This commit is contained in:
@@ -1116,6 +1116,38 @@ class ConfigureC2BeaconAction(AbstractAction):
|
||||
return ["network", "node", node_name, "application", "C2Beacon", "configure", config.__dict__]
|
||||
|
||||
|
||||
class NodeAccountsAddUserAction(AbstractAction):
|
||||
"""Action which changes adds a User."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", **kwargs) -> None:
|
||||
super().__init__(manager=manager)
|
||||
|
||||
def form_request(self, node_id: str, username: str, password: str, is_admin: bool) -> RequestFormat:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
return ["network", "node", node_name, "service", "UserManager", "add_user", username, password, is_admin]
|
||||
|
||||
|
||||
class NodeAccountsDisableUserAction(AbstractAction):
|
||||
"""Action which disables a user."""
|
||||
|
||||
def __init__(self, manager: "ActionManager", **kwargs) -> None:
|
||||
super().__init__(manager=manager)
|
||||
|
||||
def form_request(self, node_id: str, username: str) -> RequestFormat:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
return [
|
||||
"network",
|
||||
"node",
|
||||
node_name,
|
||||
"service",
|
||||
"UserManager",
|
||||
"disable_user",
|
||||
username,
|
||||
]
|
||||
|
||||
|
||||
class NodeAccountsChangePasswordAction(AbstractAction):
|
||||
"""Action which changes the password for a user."""
|
||||
|
||||
@@ -1368,6 +1400,8 @@ class ActionManager:
|
||||
"C2_SERVER_RANSOMWARE_CONFIGURE": RansomwareConfigureC2ServerAction,
|
||||
"C2_SERVER_TERMINAL_COMMAND": TerminalC2ServerAction,
|
||||
"C2_SERVER_DATA_EXFILTRATE": ExfiltrationC2ServerAction,
|
||||
"NODE_ACCOUNTS_ADD_USER": NodeAccountsAddUserAction,
|
||||
"NODE_ACCOUNTS_DISABLE_USER": NodeAccountsDisableUserAction,
|
||||
"NODE_ACCOUNTS_CHANGE_PASSWORD": NodeAccountsChangePasswordAction,
|
||||
"SSH_TO_REMOTE": NodeSessionsRemoteLoginAction,
|
||||
"SESSIONS_REMOTE_LOGOFF": NodeSessionsRemoteLogoutAction,
|
||||
|
||||
@@ -857,7 +857,21 @@ class UserManager(Service):
|
||||
"""
|
||||
rm = super()._init_request_manager()
|
||||
|
||||
# todo add doc about requeest schemas
|
||||
# todo add doc about request schemas
|
||||
rm.add_request(
|
||||
"add_user",
|
||||
RequestType(
|
||||
func=lambda request, context: RequestResponse.from_bool(
|
||||
self.add_user(username=request[0], password=request[1], is_admin=request[2])
|
||||
)
|
||||
),
|
||||
)
|
||||
rm.add_request(
|
||||
"disable_user",
|
||||
RequestType(
|
||||
func=lambda request, context: RequestResponse.from_bool(self.disable_user(username=request[0]))
|
||||
),
|
||||
)
|
||||
rm.add_request(
|
||||
"change_password",
|
||||
RequestType(
|
||||
|
||||
@@ -460,6 +460,8 @@ def game_and_agent():
|
||||
{"type": "C2_SERVER_RANSOMWARE_CONFIGURE"},
|
||||
{"type": "C2_SERVER_TERMINAL_COMMAND"},
|
||||
{"type": "C2_SERVER_DATA_EXFILTRATE"},
|
||||
{"type": "NODE_ACCOUNTS_ADD_USER"},
|
||||
{"type": "NODE_ACCOUNTS_DISABLE_USER"},
|
||||
{"type": "NODE_ACCOUNTS_CHANGE_PASSWORD"},
|
||||
{"type": "SSH_TO_REMOTE"},
|
||||
{"type": "SESSIONS_REMOTE_LOGOFF"},
|
||||
|
||||
@@ -0,0 +1,176 @@
|
||||
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def game_and_agent_fixture(game_and_agent):
|
||||
"""Create a game with a simple agent that can be controlled by the tests."""
|
||||
game, agent = game_and_agent
|
||||
|
||||
client_1: Computer = game.simulation.network.get_node_by_hostname("client_1")
|
||||
client_1.start_up_duration = 3
|
||||
|
||||
return (game, agent)
|
||||
|
||||
|
||||
def test_user_account_add_user_action(game_and_agent_fixture):
|
||||
"""Tests the add user account action."""
|
||||
game, agent = game_and_agent_fixture
|
||||
client_1 = game.simulation.network.get_node_by_hostname("client_1")
|
||||
|
||||
assert len(client_1.user_manager.users) == 1 # admin is created by default
|
||||
assert len(client_1.user_manager.admins) == 1
|
||||
|
||||
# add admin account
|
||||
action = (
|
||||
"NODE_ACCOUNTS_ADD_USER",
|
||||
{"node_id": 0, "username": "admin_2", "password": "e-tronic-boogaloo", "is_admin": True},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
|
||||
assert len(client_1.user_manager.users) == 2 # new user added
|
||||
assert len(client_1.user_manager.admins) == 2
|
||||
|
||||
# add non admin account
|
||||
action = (
|
||||
"NODE_ACCOUNTS_ADD_USER",
|
||||
{"node_id": 0, "username": "leeroy.jenkins", "password": "no_plan_needed", "is_admin": False},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
|
||||
assert len(client_1.user_manager.users) == 3 # new user added
|
||||
assert len(client_1.user_manager.admins) == 2
|
||||
|
||||
|
||||
def test_user_account_disable_user_action(game_and_agent_fixture):
|
||||
"""Tests the disable user account action."""
|
||||
game, agent = game_and_agent_fixture
|
||||
client_1 = game.simulation.network.get_node_by_hostname("client_1")
|
||||
|
||||
client_1.user_manager.add_user(username="test", password="password", is_admin=True)
|
||||
assert len(client_1.user_manager.users) == 2 # new user added
|
||||
assert len(client_1.user_manager.admins) == 2
|
||||
|
||||
test_user = client_1.user_manager.users.get("test")
|
||||
assert test_user
|
||||
assert test_user.disabled is not True
|
||||
|
||||
# disable test account
|
||||
action = (
|
||||
"NODE_ACCOUNTS_DISABLE_USER",
|
||||
{
|
||||
"node_id": 0,
|
||||
"username": "test",
|
||||
},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
assert test_user.disabled
|
||||
|
||||
|
||||
def test_user_account_change_password_action(game_and_agent_fixture):
|
||||
"""Tests the change password user account action."""
|
||||
game, agent = game_and_agent_fixture
|
||||
client_1 = game.simulation.network.get_node_by_hostname("client_1")
|
||||
|
||||
client_1.user_manager.add_user(username="test", password="password", is_admin=True)
|
||||
|
||||
test_user = client_1.user_manager.users.get("test")
|
||||
assert test_user.password == "password"
|
||||
|
||||
# change account password
|
||||
action = (
|
||||
"NODE_ACCOUNTS_CHANGE_PASSWORD",
|
||||
{"node_id": 0, "username": "test", "current_password": "password", "new_password": "2Hard_2_Hack"},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
|
||||
assert test_user.password == "2Hard_2_Hack"
|
||||
|
||||
|
||||
def test_user_account_create_terminal_action(game_and_agent_fixture):
|
||||
"""Tests that agents can use the terminal to create new users."""
|
||||
game, agent = game_and_agent_fixture
|
||||
|
||||
router = game.simulation.network.get_node_by_hostname("router")
|
||||
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.SSH, dst_port=Port.SSH, position=4)
|
||||
|
||||
server_1 = game.simulation.network.get_node_by_hostname("server_1")
|
||||
server_1_usm = server_1.software_manager.software["UserManager"]
|
||||
server_1_usm.add_user("user123", "password", is_admin=True)
|
||||
|
||||
action = (
|
||||
"SSH_TO_REMOTE",
|
||||
{
|
||||
"node_id": 0,
|
||||
"username": "user123",
|
||||
"password": "password",
|
||||
"remote_ip": str(server_1.network_interface[1].ip_address),
|
||||
},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
assert agent.history[-1].response.status == "success"
|
||||
|
||||
# Create a new user account via terminal.
|
||||
action = (
|
||||
"NODE_SEND_REMOTE_COMMAND",
|
||||
{
|
||||
"node_id": 0,
|
||||
"remote_ip": str(server_1.network_interface[1].ip_address),
|
||||
"command": ["service", "UserManager", "add_user", "new_user", "new_pass", True],
|
||||
},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
new_user = server_1.user_manager.users.get("new_user")
|
||||
assert new_user
|
||||
assert new_user.password == "new_pass"
|
||||
assert new_user.disabled is not True
|
||||
|
||||
|
||||
def test_user_account_disable_terminal_action(game_and_agent_fixture):
|
||||
"""Tests that agents can use the terminal to disable users."""
|
||||
game, agent = game_and_agent_fixture
|
||||
router = game.simulation.network.get_node_by_hostname("router")
|
||||
router.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.SSH, dst_port=Port.SSH, position=4)
|
||||
|
||||
server_1 = game.simulation.network.get_node_by_hostname("server_1")
|
||||
server_1_usm = server_1.software_manager.software["UserManager"]
|
||||
server_1_usm.add_user("user123", "password", is_admin=True)
|
||||
|
||||
action = (
|
||||
"SSH_TO_REMOTE",
|
||||
{
|
||||
"node_id": 0,
|
||||
"username": "user123",
|
||||
"password": "password",
|
||||
"remote_ip": str(server_1.network_interface[1].ip_address),
|
||||
},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
assert agent.history[-1].response.status == "success"
|
||||
|
||||
# Disable a user via terminal
|
||||
action = (
|
||||
"NODE_SEND_REMOTE_COMMAND",
|
||||
{
|
||||
"node_id": 0,
|
||||
"remote_ip": str(server_1.network_interface[1].ip_address),
|
||||
"command": ["service", "UserManager", "disable_user", "user123"],
|
||||
},
|
||||
)
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
|
||||
new_user = server_1.user_manager.users.get("user123")
|
||||
assert new_user
|
||||
assert new_user.disabled is True
|
||||
Reference in New Issue
Block a user