#2735 - Initial work done around User, UserManager, and UserSessionManager

This commit is contained in:
Chris McCarthy
2024-07-08 15:10:06 +01:00
parent 20e5e40d0d
commit 47a1daa580
7 changed files with 308 additions and 12 deletions

View File

@@ -6,7 +6,7 @@ import secrets
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, Dict, Optional, TypeVar, Union
from typing import Any, ClassVar, Dict, Optional, TypeVar, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field
@@ -37,6 +37,8 @@ from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.processes.process import Process
from primaite.simulator.system.services.access.user_manager import UserManager
from primaite.simulator.system.services.access.user_session_manager import UserSessionManager
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.software import IOSoftware
from primaite.utils.converters import convert_dict_enum_keys_to_enum_values
@@ -821,7 +823,16 @@ class Node(SimComponent):
super().__init__(**kwargs)
self.session_manager.node = self
self.session_manager.software_manager = self.software_manager
self._install_system_software()
self.software_manager.install(UserSessionManager)
self.software_manager.install(UserManager)
# @property
# def user_manager(self) -> UserManager:
# return self.software_manager.software["UserManager"] # noqa
#
# @property
# def _user_session_manager(self) -> UserSessionManager:
# return self.software_manager.software["UserSessionManager"] # noqa
def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool:
"""
@@ -876,7 +887,7 @@ class Node(SimComponent):
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return f"Cannot perform request on node '{self.node.hostname}' because it is not turned on."
return f"Cannot perform request on node '{self.node.hostname}' because it is not powered on."
def _init_request_manager(self) -> RequestManager:
"""
@@ -1000,10 +1011,6 @@ class Node(SimComponent):
return rm
def _install_system_software(self):
"""Install System Software - software that is usually provided with the OS."""
pass
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
@@ -1184,6 +1191,7 @@ class Node(SimComponent):
def pre_timestep(self, timestep: int) -> None:
"""Apply pre-timestep logic."""
super().pre_timestep(timestep)
self._
for network_interface in self.network_interfaces.values():
network_interface.pre_timestep(timestep=timestep)

View File

@@ -11,6 +11,8 @@ from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.nmap import NMAP
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.access.user_manager import UserManager
from primaite.simulator.system.services.access.user_session_manager import UserSessionManager
from primaite.simulator.system.services.arp.arp import ARP, ARPPacket
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.icmp.icmp import ICMP
@@ -306,6 +308,8 @@ class HostNode(Node):
"NTPClient": NTPClient,
"WebBrowser": WebBrowser,
"NMAP": NMAP,
# "UserSessionManager": UserSessionManager,
# "UserManager": UserManager,
}
"""List of system software that is automatically installed on nodes."""
@@ -314,9 +318,10 @@ class HostNode(Node):
network_interface: Dict[int, NIC] = {}
"The NICs on the node by port id."
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, username: str, password: str, **kwargs):
super().__init__(**kwargs)
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask))
self.user_manager.add_user(username=username, password=password, is_admin=True, bypass_can_perform_action=True)
@property
def nmap(self) -> Optional[NMAP]:
@@ -348,8 +353,6 @@ class HostNode(Node):
for _, software_class in self.SYSTEM_SOFTWARE.items():
self.software_manager.install(software_class)
super()._install_system_software()
def default_gateway_hello(self):
"""
Sends a hello message to the default gateway to establish connectivity and resolve the gateway's MAC address.

View File

@@ -0,0 +1 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK

View File

@@ -0,0 +1,186 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from pydantic import Field
from primaite.simulator.core import SimComponent
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
class User(SimComponent):
"""
Represents a user in the PrimAITE system.
:param username: The username of the user
:param password: The password of the user
:param disabled: Boolean flag indicating whether the user is disabled
:param is_admin: Boolean flag indicating whether the user has admin privileges
"""
username: str
password: str
disabled: bool = False
is_admin: bool = False
def describe_state(self) -> Dict:
"""
Returns a dictionary representing the current state of the user.
:return: A dict containing the state of the user
"""
return self.model_dump()
class UserManager(Service):
"""
Manages users within the PrimAITE system, handling creation, authentication, and administration.
:param users: A dictionary of all users by their usernames
:param admins: A dictionary of admin users by their usernames
:param disabled_admins: A dictionary of currently disabled admin users by their usernames
"""
users: Dict[str, User] = Field(default_factory=dict)
admins: Dict[str, User] = Field(default_factory=dict)
disabled_admins: Dict[str, User] = Field(default_factory=dict)
def __init__(self, **kwargs):
"""
Initializes a UserManager instanc.
:param username: The username for the default admin user
:param password: The password for the default admin user
"""
kwargs["name"] = "UserManager"
kwargs["port"] = Port.NONE
kwargs["protocol"] = IPProtocol.NONE
super().__init__(**kwargs)
self.start()
def describe_state(self) -> Dict:
"""
Returns the state of the UserManager along with the number of users and admins.
:return: A dict containing detailed state information
"""
state = super().describe_state()
state.update({"total_users": len(self.users), "total_admins": len(self.admins) + len(self.disabled_admins)})
return state
def show(self, markdown: bool = False):
"""
Display the Users.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["Username", "Admin", "Enabled"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} User Manager)"
for user in self.users.values():
table.add_row([user.username, user.is_admin, user.disabled])
print(table.get_string(sortby="Username"))
def _is_last_admin(self, username: str) -> bool:
return username in self.admins and len(self.admins) == 1
def add_user(
self, username: str, password: str, is_admin: bool = False, bypass_can_perform_action: bool = False
) -> bool:
"""
Adds a new user to the system.
:param username: The username for the new user
:param password: The password for the new user
:param is_admin: Flag indicating if the new user is an admin
:return: True if user was successfully added, False otherwise
"""
if not bypass_can_perform_action and not self._can_perform_action():
return False
if username in self.users:
return False
user = User(username=username, password=password, is_admin=is_admin)
self.users[username] = user
if is_admin:
self.admins[username] = user
self.sys_log.info(f"{self.name}: Added new {'admin' if is_admin else 'user'}: {username}")
return True
def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""
Authenticates a user's login attempt.
:param username: The username of the user trying to log in
:param password: The password provided by the user
:return: The User object if authentication is successful, None otherwise
"""
if not self._can_perform_action():
return None
user = self.users.get(username)
if user and not user.disabled and user.password == password:
self.sys_log.info(f"{self.name}: User authenticated: {username}")
return user
self.sys_log.info(f"{self.name}: Authentication failed for: {username}")
return None
def change_user_password(self, username: str, current_password: str, new_password: str) -> bool:
"""
Changes a user's password.
:param username: The username of the user changing their password
:param current_password: The current password of the user
:param new_password: The new password for the user
:return: True if the password was changed successfully, False otherwise
"""
if not self._can_perform_action():
return False
user = self.users.get(username)
if user and user.password == current_password:
user.password = new_password
self.sys_log.info(f"{self.name}: Password changed for {username}")
return True
self.sys_log.info(f"{self.name}: Password change failed for {username}")
return False
def disable_user(self, username: str) -> bool:
"""
Disables a user account, preventing them from logging in.
:param username: The username of the user to disable
:return: True if the user was disabled successfully, False otherwise
"""
if not self._can_perform_action():
return False
if username in self.users and not self.users[username].disabled:
if self._is_last_admin(username):
self.sys_log.info(f"{self.name}: Cannot disable User {username} as they are the only enabled admin")
return False
self.users[username].disabled = True
self.sys_log.info(f"{self.name}: User disabled: {username}")
if username in self.admins:
self.disabled_admins[username] = self.admins.pop(username)
return True
self.sys_log.info(f"{self.name}: Failed to disable user: {username}")
return False
def enable_user(self, username: str) -> bool:
"""
Enables a previously disabled user account.
:param username: The username of the user to enable
:return: True if the user was enabled successfully, False otherwise
"""
if not self._can_perform_action():
return False
if username in self.users and self.users[username].disabled:
self.users[username].disabled = False
self.sys_log.info(f"{self.name}: User enabled: {username}")
if username in self.disabled_admins:
self.admins[username] = self.disabled_admins.pop(username)
return True
self.sys_log.info(f"{self.name}: Failed to enable user: {username}")
return False

View File

@@ -0,0 +1,98 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from uuid import uuid4
from pydantic import BaseModel, Field
from primaite.simulator.core import SimComponent
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.access.user_manager import User, UserManager
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import IPV4Address
class UserSession(SimComponent):
user: User
start_step: int
last_active_step: int
end_step: Optional[int] = None
local: bool = True
@classmethod
def create(cls, user: User, timestep: int) -> UserSession:
return UserSession(user=user, start_step=timestep, last_active_step=timestep)
def describe_state(self) -> Dict:
return self.model_dump()
class RemoteUserSession(UserSession):
remote_ip_address: IPV4Address
local: bool = False
def describe_state(self) -> Dict:
state = super().describe_state()
state["remote_ip_address"] = str(self.remote_ip_address)
return state
class UserSessionManager(BaseModel):
node:
local_session: Optional[UserSession] = None
remote_sessions: Dict[str, RemoteUserSession] = Field(default_factory=dict)
historic_sessions: List[UserSession] = Field(default_factory=list)
local_session_timeout_steps: int = 30
remote_session_timeout_steps: int = 5
max_remote_sessions: int = 3
current_timestep: int = 0
@property
def _user_manager(self) -> UserManager:
return self.software_manager.software["UserManager"] # noqa
def pre_timestep(self, timestep: int) -> None:
"""Apply any pre-timestep logic that helps make sure we have the correct observations."""
self.current_timestep = timestep
if self.local_session:
if self.local_session.last_active_step + self.local_session_timeout_steps <= timestep:
self._timeout_session(self.local_session)
def _timeout_session(self, session: UserSession) -> None:
session.end_step = self.current_timestep
session_identity = session.user.username
if session.local:
self.local_session = None
session_type = "Local"
else:
self.remote_sessions.pop(session.uuid)
session_type = "Remote"
session_identity = f"{session_identity} {session.remote_ip_address}"
self.sys_log.info(f"{self.name}: {session_type} {session_identity} session timeout due to inactivity")
def login(self, username: str, password: str) -> Optional[str]:
if not self._can_perform_action():
return None
user = self._user_manager.authenticate_user(username=username, password=password)
if user:
self.logout()
self.local_session = UserSession.create(user=user, timestep=self.current_timestep)
self.sys_log.info(f"{self.name}: User {user.username} logged in")
return self.local_session.uuid
else:
self.sys_log.info(f"{self.name}: Incorrect username or password")
def logout(self):
if not self._can_perform_action():
return False
if self.local_session:
session = self.local_session
session.end_step = self.current_timestep
self.historic_sessions.append(session)
self.local_session = None
self.sys_log.info(f"{self.name}: User {session.user.username} logged out")

View File

@@ -43,7 +43,7 @@ class Service(IOSoftware):
restart_countdown: Optional[int] = None
"If currently restarting, how many timesteps remain until the restart is finished."
def __init__(self, **kwargs):
def __init__(self, **kwargs):c
super().__init__(**kwargs)
def _can_perform_action(self) -> bool:

View File

@@ -291,7 +291,7 @@ class IOSoftware(Software):
"""
if self.software_manager and self.software_manager.node.operating_state != NodeOperatingState.ON:
self.software_manager.node.sys_log.error(
f"{self.name} Error: {self.software_manager.node.hostname} is not online."
f"{self.name} Error: {self.software_manager.node.hostname} is not powered on."
)
return False
return True