#2735 - added docstrings to the User, UserManager, and UserSessionManager classes

This commit is contained in:
Chris McCarthy
2024-07-29 09:29:20 +01:00
parent d0c8aeae30
commit 2e35549c95

View File

@@ -6,7 +6,7 @@ import secrets
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, ClassVar, Dict, List, Optional, TypeVar, Union
from typing import Any, Dict, List, Optional, TypeVar, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field, validate_call
@@ -799,16 +799,23 @@ class User(SimComponent):
"""
Represents a user in the PrimAITE system.
:param username: The username of the user
:param password: The password of the user
:param disabled: Boolean flag indicating whether the user is disabled
:param is_admin: Boolean flag indicating whether the user has admin privileges
:ivar username: The username of the user
:ivar password: The password of the user
:ivar disabled: Boolean flag indicating whether the user is disabled
:ivar is_admin: Boolean flag indicating whether the user has admin privileges
"""
username: str
"""The username of the user"""
password: str
"""The password of the user"""
disabled: bool = False
"""Boolean flag indicating whether the user is disabled"""
is_admin: bool = False
"""Boolean flag indicating whether the user has admin privileges"""
def describe_state(self) -> Dict:
"""
@@ -971,47 +978,131 @@ class UserManager(Service):
class UserSession(SimComponent):
"""
Represents a user session on the Node.
This class manages the state of a user session, including the user, session start, last active step,
and end step. It also indicates whether the session is local.
:ivar user: The user associated with this session.
:ivar start_step: The timestep when the session was started.
:ivar last_active_step: The last timestep when the session was active.
:ivar end_step: The timestep when the session ended, if applicable.
:ivar local: Indicates if the session is local. Defaults to True.
"""
user: User
"""The user associated with this session."""
start_step: int
"""The timestep when the session was started."""
last_active_step: int
"""The last timestep when the session was active."""
end_step: Optional[int] = None
"""The timestep when the session ended, if applicable."""
local: bool = True
"""Indicates if the session is local. Defaults to True."""
@classmethod
def create(cls, user: User, timestep: int) -> UserSession:
"""
Creates a new instance of UserSession.
This class method initialises a user session with the given user and timestep.
:param user: The user associated with this session.
:param timestep: The timestep when the session is created.
:return: An instance of UserSession.
"""
return UserSession(user=user, start_step=timestep, last_active_step=timestep)
def describe_state(self) -> Dict:
"""
Describes the current state of the user session.
:return: A dictionary representing the state of the user session.
"""
return self.model_dump()
class RemoteUserSession(UserSession):
"""
Represents a remote user session on the Node.
This class extends the UserSession class to include additional attributes and methods specific to remote sessions.
:ivar remote_ip_address: The IP address of the remote user.
:ivar local: Indicates that this is not a local session. Always set to False.
"""
remote_ip_address: IPV4Address
"""The IP address of the remote user."""
local: bool = False
"""Indicates that this is not a local session. Always set to False."""
@classmethod
def create(cls, user: User, timestep: int, remote_ip_address: IPV4Address) -> RemoteUserSession: # noqa
"""
Creates a new instance of RemoteUserSession.
This class method initialises a remote user session with the given user, timestep, and remote IP address.
:param user: The user associated with this session.
:param timestep: The timestep when the session is created.
:param remote_ip_address: The IP address of the remote user.
:return: An instance of RemoteUserSession.
"""
return RemoteUserSession(
user=user, start_step=timestep, last_active_step=timestep, remote_ip_address=remote_ip_address
)
def describe_state(self) -> Dict:
"""
Describes the current state of the remote user session.
This method extends the base describe_state method to include the remote IP address.
:return: A dictionary representing the state of the remote user session.
"""
state = super().describe_state()
state["remote_ip_address"] = str(self.remote_ip_address)
return state
class UserSessionManager(Service):
"""
Manages user sessions on a Node, including local and remote sessions.
This class handles authentication, session management, and session timeouts for users interacting with the Node.
"""
node: Node
"""The node associated with this UserSessionManager."""
local_session: Optional[UserSession] = None
"""The current local user session, if any."""
remote_sessions: Dict[str, RemoteUserSession] = Field(default_factory=dict)
"""A dictionary of active remote user sessions."""
historic_sessions: List[UserSession] = Field(default_factory=list)
"""A list of historic user sessions."""
local_session_timeout_steps: int = 30
"""The number of steps before a local session times out due to inactivity."""
remote_session_timeout_steps: int = 5
"""The number of steps before a remote session times out due to inactivity."""
max_remote_sessions: int = 3
"""The maximum number of concurrent remote sessions allowed."""
current_timestep: int = 0
"""The current timestep in the simulation."""
def __init__(self, **kwargs):
"""
@@ -1027,7 +1118,13 @@ class UserSessionManager(Service):
self.start()
def show(self, markdown: bool = False, include_session_id: bool = False, include_historic: bool = False):
"""Prints a table of the user sessions on the Node."""
"""
Displays a table of the user sessions on the Node.
:param markdown: Whether to display the table in markdown format.
:param include_session_id: Whether to include session IDs in the table.
:param include_historic: Whether to include historic sessions in the table.
"""
headers = ["Session ID", "Username", "Type", "Remote IP", "Start Step", "Step Last Active", "End Step"]
if not include_session_id:
@@ -1041,6 +1138,14 @@ class UserSessionManager(Service):
table.title = f"{self.node.hostname} User Sessions"
def _add_session_to_table(user_session: UserSession):
"""
Adds a user session to the table for display.
This helper function determines whether the session is local or remote and formats the session data
accordingly. It then adds the session data to the table.
:param user_session: The user session to add to the table.
"""
session_type = "local"
remote_ip = ""
if isinstance(user_session, RemoteUserSession):
@@ -1072,12 +1177,22 @@ class UserSessionManager(Service):
print(table.get_string(sortby="Step Last Active", reversesort=True))
def describe_state(self) -> Dict:
"""
Describes the current state of the UserSessionManager.
:return: A dictionary representing the current state.
"""
state = super().describe_state()
state["active_remote_logins"] = len(self.remote_sessions)
return state
@property
def _user_manager(self) -> UserManager:
"""
Returns the UserManager instance.
:return: The UserManager instance.
"""
return self.software_manager.software["UserManager"] # noqa
def pre_timestep(self, timestep: int) -> None:
@@ -1088,6 +1203,11 @@ class UserSessionManager(Service):
self._timeout_session(self.local_session)
def _timeout_session(self, session: UserSession) -> None:
"""
Handles session timeout logic.
:param session: The session to be timed out.
"""
session.end_step = self.current_timestep
session_identity = session.user.username
if session.local:
@@ -1102,14 +1222,34 @@ class UserSessionManager(Service):
@property
def remote_session_limit_reached(self) -> bool:
"""
Checks if the maximum number of remote sessions has been reached.
:return: True if the limit is reached, otherwise False.
"""
return len(self.remote_sessions) >= self.max_remote_sessions
def validate_remote_session_uuid(self, remote_session_id: str) -> bool:
"""
Validates if a given remote session ID exists.
:param remote_session_id: The remote session ID to validate.
:return: True if the session ID exists, otherwise False.
"""
return remote_session_id in self.remote_sessions
def _login(
self, username: str, password: str, local: bool = True, remote_ip_address: Optional[IPv4Address] = None
self, username: str, password: str, local: bool = True, remote_ip_address: Optional[IPv4Address] = None
) -> Optional[str]:
"""
Logs a user in either locally or remotely.
:param username: The username of the account.
:param password: The password of the account.
:param local: Whether the login is local or remote.
:param remote_ip_address: The remote IP address for remote login.
:return: The session ID if login is successful, otherwise None.
"""
if not self._can_perform_action():
return None
@@ -1145,13 +1285,35 @@ class UserSessionManager(Service):
return session_id
def local_login(self, username: str, password: str) -> Optional[str]:
"""
Logs a user in locally.
:param username: The username of the account.
:param password: The password of the account.
:return: The session ID if login is successful, otherwise None.
"""
return self._login(username=username, password=password, local=True)
@validate_call()
def remote_login(self, username: str, password: str, remote_ip_address: IPV4Address) -> Optional[str]:
"""
Logs a user in remotely.
:param username: The username of the account.
:param password: The password of the account.
:param remote_ip_address: The remote IP address for the remote login.
:return: The session ID if login is successful, otherwise None.
"""
return self._login(username=username, password=password, local=False, remote_ip_address=remote_ip_address)
def _logout(self, local: bool = True, remote_session_id: Optional[str] = None):
def _logout(self, local: bool = True, remote_session_id: Optional[str] = None) -> bool:
"""
Logs a user out either locally or remotely.
:param local: Whether the logout is local or remote.
:param remote_session_id: The remote session ID for remote logout.
:return: True if logout successful, otherwise False.
"""
if not self._can_perform_action():
return False
session = None
@@ -1165,16 +1327,33 @@ class UserSessionManager(Service):
if session:
self.historic_sessions.append(session)
self.sys_log.info(f"{self.name}: User {session.user.username} logged out")
return
return True
return False
def local_logout(self):
self._logout(local=True)
def local_logout(self) -> bool:
"""
Logs out the current local user.
def remote_logout(self, remote_session_id: str):
self._logout(local=False, remote_session_id=remote_session_id)
:return: True if logout successful, otherwise False.
"""
return self._logout(local=True)
def remote_logout(self, remote_session_id: str) -> bool:
"""
Logs out a remote user by session ID.
:param remote_session_id: The remote session ID.
:return: True if logout successful, otherwise False.
"""
return self._logout(local=False, remote_session_id=remote_session_id)
@property
def local_user_logged_in(self):
def local_user_logged_in(self) -> bool:
"""
Checks if a local user is currently logged in.
:return: True if a local user is logged in, otherwise False.
"""
return self.local_session is not None
@@ -1249,7 +1428,7 @@ class Node(SimComponent):
"""
Initialize the Node with various components and managers.
This method initializes the ARP cache, ICMP handler, session manager, and software manager if they are not
This method initialises the ARP cache, ICMP handler, session manager, and software manager if they are not
provided.
"""
if not kwargs.get("sys_log"):
@@ -1278,17 +1457,34 @@ class Node(SimComponent):
@property
def user_manager(self) -> UserManager:
"""The Nodes User Manager."""
return self.software_manager.software["UserManager"] # noqa
@property
def user_session_manager(self) -> UserSessionManager:
"""The Nodes User Session Manager."""
return self.software_manager.software["UserSessionManager"] # noqa
def local_login(self, username: str, password: str) -> Optional[str]:
"""
Attempt to log in to the node uas a local user.
This method attempts to authenticate a local user with the given username and password. If successful, it
returns a session token. If authentication fails, it returns None.
:param username: The username of the account attempting to log in.
:param password: The password of the account attempting to log in.
:return: A session token if the login is successful, otherwise None.
"""
return self.user_session_manager.local_login(username, password)
def logout(self):
return self.user_session_manager.logout()
def local_logout(self) -> None:
"""
Log out the current local user from the node.
This method ends the current local user's session and invalidates the session token.
"""
return self.user_session_manager.local_logout()
def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool:
"""