diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 3ffc7b35..e33c6014 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -6,7 +6,7 @@ import secrets from abc import ABC, abstractmethod from ipaddress import IPv4Address, IPv4Network from pathlib import Path -from typing import Any, ClassVar, Dict, List, Optional, TypeVar, Union +from typing import Any, Dict, List, Optional, TypeVar, Union from prettytable import MARKDOWN, PrettyTable from pydantic import BaseModel, Field, validate_call @@ -799,16 +799,23 @@ class User(SimComponent): """ Represents a user in the PrimAITE system. - :param username: The username of the user - :param password: The password of the user - :param disabled: Boolean flag indicating whether the user is disabled - :param is_admin: Boolean flag indicating whether the user has admin privileges + :ivar username: The username of the user + :ivar password: The password of the user + :ivar disabled: Boolean flag indicating whether the user is disabled + :ivar is_admin: Boolean flag indicating whether the user has admin privileges """ username: str + """The username of the user""" + password: str + """The password of the user""" + disabled: bool = False + """Boolean flag indicating whether the user is disabled""" + is_admin: bool = False + """Boolean flag indicating whether the user has admin privileges""" def describe_state(self) -> Dict: """ @@ -971,47 +978,131 @@ class UserManager(Service): class UserSession(SimComponent): + """ + Represents a user session on the Node. + + This class manages the state of a user session, including the user, session start, last active step, + and end step. It also indicates whether the session is local. + + :ivar user: The user associated with this session. + :ivar start_step: The timestep when the session was started. + :ivar last_active_step: The last timestep when the session was active. + :ivar end_step: The timestep when the session ended, if applicable. + :ivar local: Indicates if the session is local. Defaults to True. + """ + user: User + """The user associated with this session.""" + start_step: int + """The timestep when the session was started.""" + last_active_step: int + """The last timestep when the session was active.""" + end_step: Optional[int] = None + """The timestep when the session ended, if applicable.""" + local: bool = True + """Indicates if the session is local. Defaults to True.""" @classmethod def create(cls, user: User, timestep: int) -> UserSession: + """ + Creates a new instance of UserSession. + + This class method initialises a user session with the given user and timestep. + + :param user: The user associated with this session. + :param timestep: The timestep when the session is created. + :return: An instance of UserSession. + """ return UserSession(user=user, start_step=timestep, last_active_step=timestep) def describe_state(self) -> Dict: + """ + Describes the current state of the user session. + + :return: A dictionary representing the state of the user session. + """ return self.model_dump() class RemoteUserSession(UserSession): + """ + Represents a remote user session on the Node. + + This class extends the UserSession class to include additional attributes and methods specific to remote sessions. + + :ivar remote_ip_address: The IP address of the remote user. + :ivar local: Indicates that this is not a local session. Always set to False. + """ + remote_ip_address: IPV4Address + """The IP address of the remote user.""" + local: bool = False + """Indicates that this is not a local session. Always set to False.""" @classmethod def create(cls, user: User, timestep: int, remote_ip_address: IPV4Address) -> RemoteUserSession: # noqa + """ + Creates a new instance of RemoteUserSession. + + This class method initialises a remote user session with the given user, timestep, and remote IP address. + + :param user: The user associated with this session. + :param timestep: The timestep when the session is created. + :param remote_ip_address: The IP address of the remote user. + :return: An instance of RemoteUserSession. + """ return RemoteUserSession( user=user, start_step=timestep, last_active_step=timestep, remote_ip_address=remote_ip_address ) def describe_state(self) -> Dict: + """ + Describes the current state of the remote user session. + + This method extends the base describe_state method to include the remote IP address. + + :return: A dictionary representing the state of the remote user session. + """ state = super().describe_state() state["remote_ip_address"] = str(self.remote_ip_address) return state class UserSessionManager(Service): + """ + Manages user sessions on a Node, including local and remote sessions. + + This class handles authentication, session management, and session timeouts for users interacting with the Node. + """ + node: Node + """The node associated with this UserSessionManager.""" + local_session: Optional[UserSession] = None + """The current local user session, if any.""" + remote_sessions: Dict[str, RemoteUserSession] = Field(default_factory=dict) + """A dictionary of active remote user sessions.""" + historic_sessions: List[UserSession] = Field(default_factory=list) + """A list of historic user sessions.""" local_session_timeout_steps: int = 30 + """The number of steps before a local session times out due to inactivity.""" + remote_session_timeout_steps: int = 5 + """The number of steps before a remote session times out due to inactivity.""" + max_remote_sessions: int = 3 + """The maximum number of concurrent remote sessions allowed.""" current_timestep: int = 0 + """The current timestep in the simulation.""" def __init__(self, **kwargs): """ @@ -1027,7 +1118,13 @@ class UserSessionManager(Service): self.start() def show(self, markdown: bool = False, include_session_id: bool = False, include_historic: bool = False): - """Prints a table of the user sessions on the Node.""" + """ + Displays a table of the user sessions on the Node. + + :param markdown: Whether to display the table in markdown format. + :param include_session_id: Whether to include session IDs in the table. + :param include_historic: Whether to include historic sessions in the table. + """ headers = ["Session ID", "Username", "Type", "Remote IP", "Start Step", "Step Last Active", "End Step"] if not include_session_id: @@ -1041,6 +1138,14 @@ class UserSessionManager(Service): table.title = f"{self.node.hostname} User Sessions" def _add_session_to_table(user_session: UserSession): + """ + Adds a user session to the table for display. + + This helper function determines whether the session is local or remote and formats the session data + accordingly. It then adds the session data to the table. + + :param user_session: The user session to add to the table. + """ session_type = "local" remote_ip = "" if isinstance(user_session, RemoteUserSession): @@ -1072,12 +1177,22 @@ class UserSessionManager(Service): print(table.get_string(sortby="Step Last Active", reversesort=True)) def describe_state(self) -> Dict: + """ + Describes the current state of the UserSessionManager. + + :return: A dictionary representing the current state. + """ state = super().describe_state() state["active_remote_logins"] = len(self.remote_sessions) return state @property def _user_manager(self) -> UserManager: + """ + Returns the UserManager instance. + + :return: The UserManager instance. + """ return self.software_manager.software["UserManager"] # noqa def pre_timestep(self, timestep: int) -> None: @@ -1088,6 +1203,11 @@ class UserSessionManager(Service): self._timeout_session(self.local_session) def _timeout_session(self, session: UserSession) -> None: + """ + Handles session timeout logic. + + :param session: The session to be timed out. + """ session.end_step = self.current_timestep session_identity = session.user.username if session.local: @@ -1102,14 +1222,34 @@ class UserSessionManager(Service): @property def remote_session_limit_reached(self) -> bool: + """ + Checks if the maximum number of remote sessions has been reached. + + :return: True if the limit is reached, otherwise False. + """ return len(self.remote_sessions) >= self.max_remote_sessions def validate_remote_session_uuid(self, remote_session_id: str) -> bool: + """ + Validates if a given remote session ID exists. + + :param remote_session_id: The remote session ID to validate. + :return: True if the session ID exists, otherwise False. + """ return remote_session_id in self.remote_sessions def _login( - self, username: str, password: str, local: bool = True, remote_ip_address: Optional[IPv4Address] = None + self, username: str, password: str, local: bool = True, remote_ip_address: Optional[IPv4Address] = None ) -> Optional[str]: + """ + Logs a user in either locally or remotely. + + :param username: The username of the account. + :param password: The password of the account. + :param local: Whether the login is local or remote. + :param remote_ip_address: The remote IP address for remote login. + :return: The session ID if login is successful, otherwise None. + """ if not self._can_perform_action(): return None @@ -1145,13 +1285,35 @@ class UserSessionManager(Service): return session_id def local_login(self, username: str, password: str) -> Optional[str]: + """ + Logs a user in locally. + + :param username: The username of the account. + :param password: The password of the account. + :return: The session ID if login is successful, otherwise None. + """ return self._login(username=username, password=password, local=True) @validate_call() def remote_login(self, username: str, password: str, remote_ip_address: IPV4Address) -> Optional[str]: + """ + Logs a user in remotely. + + :param username: The username of the account. + :param password: The password of the account. + :param remote_ip_address: The remote IP address for the remote login. + :return: The session ID if login is successful, otherwise None. + """ return self._login(username=username, password=password, local=False, remote_ip_address=remote_ip_address) - def _logout(self, local: bool = True, remote_session_id: Optional[str] = None): + def _logout(self, local: bool = True, remote_session_id: Optional[str] = None) -> bool: + """ + Logs a user out either locally or remotely. + + :param local: Whether the logout is local or remote. + :param remote_session_id: The remote session ID for remote logout. + :return: True if logout successful, otherwise False. + """ if not self._can_perform_action(): return False session = None @@ -1165,16 +1327,33 @@ class UserSessionManager(Service): if session: self.historic_sessions.append(session) self.sys_log.info(f"{self.name}: User {session.user.username} logged out") - return + return True + return False - def local_logout(self): - self._logout(local=True) + def local_logout(self) -> bool: + """ + Logs out the current local user. - def remote_logout(self, remote_session_id: str): - self._logout(local=False, remote_session_id=remote_session_id) + :return: True if logout successful, otherwise False. + """ + return self._logout(local=True) + + def remote_logout(self, remote_session_id: str) -> bool: + """ + Logs out a remote user by session ID. + + :param remote_session_id: The remote session ID. + :return: True if logout successful, otherwise False. + """ + return self._logout(local=False, remote_session_id=remote_session_id) @property - def local_user_logged_in(self): + def local_user_logged_in(self) -> bool: + """ + Checks if a local user is currently logged in. + + :return: True if a local user is logged in, otherwise False. + """ return self.local_session is not None @@ -1249,7 +1428,7 @@ class Node(SimComponent): """ Initialize the Node with various components and managers. - This method initializes the ARP cache, ICMP handler, session manager, and software manager if they are not + This method initialises the ARP cache, ICMP handler, session manager, and software manager if they are not provided. """ if not kwargs.get("sys_log"): @@ -1278,17 +1457,34 @@ class Node(SimComponent): @property def user_manager(self) -> UserManager: + """The Nodes User Manager.""" return self.software_manager.software["UserManager"] # noqa @property def user_session_manager(self) -> UserSessionManager: + """The Nodes User Session Manager.""" return self.software_manager.software["UserSessionManager"] # noqa def local_login(self, username: str, password: str) -> Optional[str]: + """ + Attempt to log in to the node uas a local user. + + This method attempts to authenticate a local user with the given username and password. If successful, it + returns a session token. If authentication fails, it returns None. + + :param username: The username of the account attempting to log in. + :param password: The password of the account attempting to log in. + :return: A session token if the login is successful, otherwise None. + """ return self.user_session_manager.local_login(username, password) - def logout(self): - return self.user_session_manager.logout() + def local_logout(self) -> None: + """ + Log out the current local user from the node. + + This method ends the current local user's session and invalidates the session token. + """ + return self.user_session_manager.local_logout() def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool: """