diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 6942d280..ada9c57a 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -6,7 +6,7 @@ import secrets from abc import ABC, abstractmethod from ipaddress import IPv4Address, IPv4Network from pathlib import Path -from typing import Any, Dict, Optional, TypeVar, Union +from typing import Any, ClassVar, Dict, Optional, TypeVar, Union from prettytable import MARKDOWN, PrettyTable from pydantic import BaseModel, Field @@ -37,6 +37,8 @@ from primaite.simulator.system.core.session_manager import SessionManager from primaite.simulator.system.core.software_manager import SoftwareManager from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.system.processes.process import Process +from primaite.simulator.system.services.access.user_manager import UserManager +from primaite.simulator.system.services.access.user_session_manager import UserSessionManager from primaite.simulator.system.services.service import Service from primaite.simulator.system.software import IOSoftware from primaite.utils.converters import convert_dict_enum_keys_to_enum_values @@ -821,7 +823,16 @@ class Node(SimComponent): super().__init__(**kwargs) self.session_manager.node = self self.session_manager.software_manager = self.software_manager - self._install_system_software() + self.software_manager.install(UserSessionManager) + self.software_manager.install(UserManager) + + # @property + # def user_manager(self) -> UserManager: + # return self.software_manager.software["UserManager"] # noqa + # + # @property + # def _user_session_manager(self) -> UserSessionManager: + # return self.software_manager.software["UserSessionManager"] # noqa def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool: """ @@ -876,7 +887,7 @@ class Node(SimComponent): @property def fail_message(self) -> str: """Message that is reported when a request is rejected by this validator.""" - return f"Cannot perform request on node '{self.node.hostname}' because it is not turned on." + return f"Cannot perform request on node '{self.node.hostname}' because it is not powered on." def _init_request_manager(self) -> RequestManager: """ @@ -1000,10 +1011,6 @@ class Node(SimComponent): return rm - def _install_system_software(self): - """Install System Software - software that is usually provided with the OS.""" - pass - def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. @@ -1184,6 +1191,7 @@ class Node(SimComponent): def pre_timestep(self, timestep: int) -> None: """Apply pre-timestep logic.""" super().pre_timestep(timestep) + self._ for network_interface in self.network_interfaces.values(): network_interface.pre_timestep(timestep=timestep) diff --git a/src/primaite/simulator/network/hardware/nodes/host/host_node.py b/src/primaite/simulator/network/hardware/nodes/host/host_node.py index fdb28339..80f80a04 100644 --- a/src/primaite/simulator/network/hardware/nodes/host/host_node.py +++ b/src/primaite/simulator/network/hardware/nodes/host/host_node.py @@ -11,6 +11,8 @@ from primaite.simulator.network.transmission.data_link_layer import Frame from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.nmap import NMAP from primaite.simulator.system.applications.web_browser import WebBrowser +from primaite.simulator.system.services.access.user_manager import UserManager +from primaite.simulator.system.services.access.user_session_manager import UserSessionManager from primaite.simulator.system.services.arp.arp import ARP, ARPPacket from primaite.simulator.system.services.dns.dns_client import DNSClient from primaite.simulator.system.services.icmp.icmp import ICMP @@ -306,6 +308,8 @@ class HostNode(Node): "NTPClient": NTPClient, "WebBrowser": WebBrowser, "NMAP": NMAP, + # "UserSessionManager": UserSessionManager, + # "UserManager": UserManager, } """List of system software that is automatically installed on nodes.""" @@ -314,9 +318,10 @@ class HostNode(Node): network_interface: Dict[int, NIC] = {} "The NICs on the node by port id." - def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs): + def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, username: str, password: str, **kwargs): super().__init__(**kwargs) self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) + self.user_manager.add_user(username=username, password=password, is_admin=True, bypass_can_perform_action=True) @property def nmap(self) -> Optional[NMAP]: @@ -348,8 +353,6 @@ class HostNode(Node): for _, software_class in self.SYSTEM_SOFTWARE.items(): self.software_manager.install(software_class) - super()._install_system_software() - def default_gateway_hello(self): """ Sends a hello message to the default gateway to establish connectivity and resolve the gateway's MAC address. diff --git a/src/primaite/simulator/system/services/access/__init__.py b/src/primaite/simulator/system/services/access/__init__.py new file mode 100644 index 00000000..be6c00e7 --- /dev/null +++ b/src/primaite/simulator/system/services/access/__init__.py @@ -0,0 +1 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK diff --git a/src/primaite/simulator/system/services/access/user_manager.py b/src/primaite/simulator/system/services/access/user_manager.py new file mode 100644 index 00000000..09f8950e --- /dev/null +++ b/src/primaite/simulator/system/services/access/user_manager.py @@ -0,0 +1,186 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from typing import Dict, Optional + +from prettytable import MARKDOWN, PrettyTable +from pydantic import Field + +from primaite.simulator.core import SimComponent +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.service import Service + + +class User(SimComponent): + """ + Represents a user in the PrimAITE system. + + :param username: The username of the user + :param password: The password of the user + :param disabled: Boolean flag indicating whether the user is disabled + :param is_admin: Boolean flag indicating whether the user has admin privileges + """ + + username: str + password: str + disabled: bool = False + is_admin: bool = False + + def describe_state(self) -> Dict: + """ + Returns a dictionary representing the current state of the user. + + :return: A dict containing the state of the user + """ + return self.model_dump() + + +class UserManager(Service): + """ + Manages users within the PrimAITE system, handling creation, authentication, and administration. + + :param users: A dictionary of all users by their usernames + :param admins: A dictionary of admin users by their usernames + :param disabled_admins: A dictionary of currently disabled admin users by their usernames + """ + + users: Dict[str, User] = Field(default_factory=dict) + admins: Dict[str, User] = Field(default_factory=dict) + disabled_admins: Dict[str, User] = Field(default_factory=dict) + + def __init__(self, **kwargs): + """ + Initializes a UserManager instanc. + + :param username: The username for the default admin user + :param password: The password for the default admin user + """ + kwargs["name"] = "UserManager" + kwargs["port"] = Port.NONE + kwargs["protocol"] = IPProtocol.NONE + super().__init__(**kwargs) + self.start() + + def describe_state(self) -> Dict: + """ + Returns the state of the UserManager along with the number of users and admins. + + :return: A dict containing detailed state information + """ + state = super().describe_state() + state.update({"total_users": len(self.users), "total_admins": len(self.admins) + len(self.disabled_admins)}) + return state + + def show(self, markdown: bool = False): + """ + Display the Users. + + :param markdown: Whether to display the table in Markdown format or not. Default is `False`. + """ + table = PrettyTable(["Username", "Admin", "Enabled"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.sys_log.hostname} User Manager)" + for user in self.users.values(): + table.add_row([user.username, user.is_admin, user.disabled]) + print(table.get_string(sortby="Username")) + + def _is_last_admin(self, username: str) -> bool: + return username in self.admins and len(self.admins) == 1 + + def add_user( + self, username: str, password: str, is_admin: bool = False, bypass_can_perform_action: bool = False + ) -> bool: + """ + Adds a new user to the system. + + :param username: The username for the new user + :param password: The password for the new user + :param is_admin: Flag indicating if the new user is an admin + :return: True if user was successfully added, False otherwise + """ + if not bypass_can_perform_action and not self._can_perform_action(): + return False + if username in self.users: + return False + user = User(username=username, password=password, is_admin=is_admin) + self.users[username] = user + if is_admin: + self.admins[username] = user + self.sys_log.info(f"{self.name}: Added new {'admin' if is_admin else 'user'}: {username}") + return True + + def authenticate_user(self, username: str, password: str) -> Optional[User]: + """ + Authenticates a user's login attempt. + + :param username: The username of the user trying to log in + :param password: The password provided by the user + :return: The User object if authentication is successful, None otherwise + """ + if not self._can_perform_action(): + return None + user = self.users.get(username) + if user and not user.disabled and user.password == password: + self.sys_log.info(f"{self.name}: User authenticated: {username}") + return user + self.sys_log.info(f"{self.name}: Authentication failed for: {username}") + return None + + def change_user_password(self, username: str, current_password: str, new_password: str) -> bool: + """ + Changes a user's password. + + :param username: The username of the user changing their password + :param current_password: The current password of the user + :param new_password: The new password for the user + :return: True if the password was changed successfully, False otherwise + """ + if not self._can_perform_action(): + return False + user = self.users.get(username) + if user and user.password == current_password: + user.password = new_password + self.sys_log.info(f"{self.name}: Password changed for {username}") + return True + self.sys_log.info(f"{self.name}: Password change failed for {username}") + return False + + def disable_user(self, username: str) -> bool: + """ + Disables a user account, preventing them from logging in. + + :param username: The username of the user to disable + :return: True if the user was disabled successfully, False otherwise + """ + if not self._can_perform_action(): + return False + if username in self.users and not self.users[username].disabled: + if self._is_last_admin(username): + self.sys_log.info(f"{self.name}: Cannot disable User {username} as they are the only enabled admin") + return False + self.users[username].disabled = True + self.sys_log.info(f"{self.name}: User disabled: {username}") + if username in self.admins: + self.disabled_admins[username] = self.admins.pop(username) + return True + self.sys_log.info(f"{self.name}: Failed to disable user: {username}") + return False + + def enable_user(self, username: str) -> bool: + """ + Enables a previously disabled user account. + + :param username: The username of the user to enable + :return: True if the user was enabled successfully, False otherwise + """ + if not self._can_perform_action(): + return False + if username in self.users and self.users[username].disabled: + self.users[username].disabled = False + self.sys_log.info(f"{self.name}: User enabled: {username}") + if username in self.disabled_admins: + self.admins[username] = self.disabled_admins.pop(username) + return True + self.sys_log.info(f"{self.name}: Failed to enable user: {username}") + return False diff --git a/src/primaite/simulator/system/services/access/user_session_manager.py b/src/primaite/simulator/system/services/access/user_session_manager.py new file mode 100644 index 00000000..03d2dd93 --- /dev/null +++ b/src/primaite/simulator/system/services/access/user_session_manager.py @@ -0,0 +1,98 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Dict, List, Optional +from uuid import uuid4 + +from pydantic import BaseModel, Field + +from primaite.simulator.core import SimComponent +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.access.user_manager import User, UserManager +from primaite.simulator.system.services.service import Service +from primaite.utils.validators import IPV4Address + + +class UserSession(SimComponent): + user: User + start_step: int + last_active_step: int + end_step: Optional[int] = None + local: bool = True + + @classmethod + def create(cls, user: User, timestep: int) -> UserSession: + return UserSession(user=user, start_step=timestep, last_active_step=timestep) + def describe_state(self) -> Dict: + return self.model_dump() + + +class RemoteUserSession(UserSession): + remote_ip_address: IPV4Address + local: bool = False + + def describe_state(self) -> Dict: + state = super().describe_state() + state["remote_ip_address"] = str(self.remote_ip_address) + return state + + +class UserSessionManager(BaseModel): + node: + local_session: Optional[UserSession] = None + remote_sessions: Dict[str, RemoteUserSession] = Field(default_factory=dict) + historic_sessions: List[UserSession] = Field(default_factory=list) + + local_session_timeout_steps: int = 30 + remote_session_timeout_steps: int = 5 + max_remote_sessions: int = 3 + + current_timestep: int = 0 + + @property + def _user_manager(self) -> UserManager: + return self.software_manager.software["UserManager"] # noqa + + def pre_timestep(self, timestep: int) -> None: + """Apply any pre-timestep logic that helps make sure we have the correct observations.""" + self.current_timestep = timestep + if self.local_session: + if self.local_session.last_active_step + self.local_session_timeout_steps <= timestep: + self._timeout_session(self.local_session) + + def _timeout_session(self, session: UserSession) -> None: + session.end_step = self.current_timestep + session_identity = session.user.username + if session.local: + self.local_session = None + session_type = "Local" + else: + self.remote_sessions.pop(session.uuid) + session_type = "Remote" + session_identity = f"{session_identity} {session.remote_ip_address}" + + self.sys_log.info(f"{self.name}: {session_type} {session_identity} session timeout due to inactivity") + + def login(self, username: str, password: str) -> Optional[str]: + if not self._can_perform_action(): + return None + user = self._user_manager.authenticate_user(username=username, password=password) + if user: + self.logout() + self.local_session = UserSession.create(user=user, timestep=self.current_timestep) + self.sys_log.info(f"{self.name}: User {user.username} logged in") + return self.local_session.uuid + else: + self.sys_log.info(f"{self.name}: Incorrect username or password") + + def logout(self): + if not self._can_perform_action(): + return False + if self.local_session: + session = self.local_session + session.end_step = self.current_timestep + self.historic_sessions.append(session) + self.local_session = None + self.sys_log.info(f"{self.name}: User {session.user.username} logged out") diff --git a/src/primaite/simulator/system/services/service.py b/src/primaite/simulator/system/services/service.py index e6ce2c87..bef9804f 100644 --- a/src/primaite/simulator/system/services/service.py +++ b/src/primaite/simulator/system/services/service.py @@ -43,7 +43,7 @@ class Service(IOSoftware): restart_countdown: Optional[int] = None "If currently restarting, how many timesteps remain until the restart is finished." - def __init__(self, **kwargs): + def __init__(self, **kwargs):c super().__init__(**kwargs) def _can_perform_action(self) -> bool: diff --git a/src/primaite/simulator/system/software.py b/src/primaite/simulator/system/software.py index 7ea67dcd..7c27534a 100644 --- a/src/primaite/simulator/system/software.py +++ b/src/primaite/simulator/system/software.py @@ -291,7 +291,7 @@ class IOSoftware(Software): """ if self.software_manager and self.software_manager.node.operating_state != NodeOperatingState.ON: self.software_manager.node.sys_log.error( - f"{self.name} Error: {self.software_manager.node.hostname} is not online." + f"{self.name} Error: {self.software_manager.node.hostname} is not powered on." ) return False return True