From 8702dc706797ad7970ba7a5bed1a7fbff7175c04 Mon Sep 17 00:00:00 2001 From: Chris McCarthy Date: Fri, 19 Jul 2024 10:34:32 +0100 Subject: [PATCH] #2735 - tidies up some oif the api, temporarily integrated login checks to ping for testing, added temp test --- .../simulator/network/hardware/base.py | 357 +++++++++++++++++- .../network/hardware/nodes/host/host_node.py | 5 +- .../simulator/system/core/software_manager.py | 8 +- .../system/services/access/user_manager.py | 185 --------- .../services/access/user_session_manager.py | 97 ----- .../simulator/system/services/service.py | 2 +- .../system/test_local_accounts.py | 37 ++ 7 files changed, 391 insertions(+), 300 deletions(-) create mode 100644 tests/integration_tests/system/test_local_accounts.py diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 64fad264..9e6784c5 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -6,7 +6,7 @@ import secrets from abc import ABC, abstractmethod from ipaddress import IPv4Address, IPv4Network from pathlib import Path -from typing import Any, ClassVar, Dict, Optional, TypeVar, Union +from typing import Any, ClassVar, Dict, List, Optional, TypeVar, Union from prettytable import MARKDOWN, PrettyTable from pydantic import BaseModel, Field @@ -31,14 +31,13 @@ from primaite.simulator.network.nmne import ( ) from primaite.simulator.network.transmission.data_link_layer import Frame from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.application import Application from primaite.simulator.system.core.packet_capture import PacketCapture from primaite.simulator.system.core.session_manager import SessionManager from primaite.simulator.system.core.software_manager import SoftwareManager from primaite.simulator.system.core.sys_log import SysLog from primaite.simulator.system.processes.process import Process -from primaite.simulator.system.services.access.user_manager import UserManager -from primaite.simulator.system.services.access.user_session_manager import UserSessionManager from primaite.simulator.system.services.service import Service from primaite.simulator.system.software import IOSoftware from primaite.utils.converters import convert_dict_enum_keys_to_enum_values @@ -796,6 +795,330 @@ class Link(SimComponent): self.current_load = 0.0 +class User(SimComponent): + """ + Represents a user in the PrimAITE system. + + :param username: The username of the user + :param password: The password of the user + :param disabled: Boolean flag indicating whether the user is disabled + :param is_admin: Boolean flag indicating whether the user has admin privileges + """ + + username: str + password: str + disabled: bool = False + is_admin: bool = False + + def describe_state(self) -> Dict: + """ + Returns a dictionary representing the current state of the user. + + :return: A dict containing the state of the user + """ + return self.model_dump() + + +class UserManager(Service): + """ + Manages users within the PrimAITE system, handling creation, authentication, and administration. + + :param users: A dictionary of all users by their usernames + :param admins: A dictionary of admin users by their usernames + :param disabled_admins: A dictionary of currently disabled admin users by their usernames + """ + + users: Dict[str, User] = Field(default_factory=dict) + admins: Dict[str, User] = Field(default_factory=dict) + disabled_admins: Dict[str, User] = Field(default_factory=dict) + + def __init__(self, **kwargs): + """ + Initializes a UserManager instanc. + + :param username: The username for the default admin user + :param password: The password for the default admin user + """ + kwargs["name"] = "UserManager" + kwargs["port"] = Port.NONE + kwargs["protocol"] = IPProtocol.NONE + super().__init__(**kwargs) + self.start() + + def describe_state(self) -> Dict: + """ + Returns the state of the UserManager along with the number of users and admins. + + :return: A dict containing detailed state information + """ + state = super().describe_state() + state.update({"total_users": len(self.users), "total_admins": len(self.admins) + len(self.disabled_admins)}) + return state + + def show(self, markdown: bool = False): + """ + Display the Users. + + :param markdown: Whether to display the table in Markdown format or not. Default is `False`. + """ + table = PrettyTable(["Username", "Admin", "Disabled"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.sys_log.hostname} User Manager" + for user in self.users.values(): + table.add_row([user.username, user.is_admin, user.disabled]) + print(table.get_string(sortby="Username")) + + def _is_last_admin(self, username: str) -> bool: + return username in self.admins and len(self.admins) == 1 + + def add_user( + self, username: str, password: str, is_admin: bool = False, bypass_can_perform_action: bool = False + ) -> bool: + """ + Adds a new user to the system. + + :param username: The username for the new user + :param password: The password for the new user + :param is_admin: Flag indicating if the new user is an admin + :return: True if user was successfully added, False otherwise + """ + if not bypass_can_perform_action and not self._can_perform_action(): + return False + if username in self.users: + self.sys_log.info(f"{self.name}: Failed to create new user {username} as this user name already exists") + return False + user = User(username=username, password=password, is_admin=is_admin) + self.users[username] = user + if is_admin: + self.admins[username] = user + self.sys_log.info(f"{self.name}: Added new {'admin' if is_admin else 'user'}: {username}") + return True + + def authenticate_user(self, username: str, password: str) -> Optional[User]: + """ + Authenticates a user's login attempt. + + :param username: The username of the user trying to log in + :param password: The password provided by the user + :return: The User object if authentication is successful, None otherwise + """ + if not self._can_perform_action(): + return None + user = self.users.get(username) + if user and not user.disabled and user.password == password: + self.sys_log.info(f"{self.name}: User authenticated: {username}") + return user + self.sys_log.info(f"{self.name}: Authentication failed for: {username}") + return None + + def change_user_password(self, username: str, current_password: str, new_password: str) -> bool: + """ + Changes a user's password. + + :param username: The username of the user changing their password + :param current_password: The current password of the user + :param new_password: The new password for the user + :return: True if the password was changed successfully, False otherwise + """ + if not self._can_perform_action(): + return False + user = self.users.get(username) + if user and user.password == current_password: + user.password = new_password + self.sys_log.info(f"{self.name}: Password changed for {username}") + return True + self.sys_log.info(f"{self.name}: Password change failed for {username}") + return False + + def disable_user(self, username: str) -> bool: + """ + Disables a user account, preventing them from logging in. + + :param username: The username of the user to disable + :return: True if the user was disabled successfully, False otherwise + """ + if not self._can_perform_action(): + return False + if username in self.users and not self.users[username].disabled: + if self._is_last_admin(username): + self.sys_log.info(f"{self.name}: Cannot disable User {username} as they are the only enabled admin") + return False + self.users[username].disabled = True + self.sys_log.info(f"{self.name}: User disabled: {username}") + if username in self.admins: + self.disabled_admins[username] = self.admins.pop(username) + return True + self.sys_log.info(f"{self.name}: Failed to disable user: {username}") + return False + + def enable_user(self, username: str) -> bool: + """ + Enables a previously disabled user account. + + :param username: The username of the user to enable + :return: True if the user was enabled successfully, False otherwise + """ + if username in self.users and self.users[username].disabled: + self.users[username].disabled = False + self.sys_log.info(f"{self.name}: User enabled: {username}") + if username in self.disabled_admins: + self.admins[username] = self.disabled_admins.pop(username) + return True + self.sys_log.info(f"{self.name}: Failed to enable user: {username}") + return False + + +class UserSession(SimComponent): + user: User + start_step: int + last_active_step: int + end_step: Optional[int] = None + local: bool = True + + @classmethod + def create(cls, user: User, timestep: int) -> UserSession: + return UserSession(user=user, start_step=timestep, last_active_step=timestep) + + def describe_state(self) -> Dict: + return self.model_dump() + + +class RemoteUserSession(UserSession): + remote_ip_address: IPV4Address + local: bool = False + + def describe_state(self) -> Dict: + state = super().describe_state() + state["remote_ip_address"] = str(self.remote_ip_address) + return state + + +class UserSessionManager(Service): + node: Node + local_session: Optional[UserSession] = None + remote_sessions: Dict[str, RemoteUserSession] = Field(default_factory=dict) + historic_sessions: List[UserSession] = Field(default_factory=list) + + local_session_timeout_steps: int = 30 + remote_session_timeout_steps: int = 5 + max_remote_sessions: int = 3 + + current_timestep: int = 0 + + def __init__(self, **kwargs): + """ + Initializes a UserSessionManager instance. + + :param username: The username for the default admin user + :param password: The password for the default admin user + """ + kwargs["name"] = "UserSessionManager" + kwargs["port"] = Port.NONE + kwargs["protocol"] = IPProtocol.NONE + super().__init__(**kwargs) + self.start() + + def show(self, markdown: bool = False, include_session_id: bool = False, include_historic: bool = False): + """Prints a table of the user sessions on the Node.""" + headers = ["Session ID", "Username", "Type", "Remote IP", "Start Step", "Step Last Active", "End Step"] + + if not include_session_id: + headers = headers[1:] + + table = PrettyTable(headers) + + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.node.hostname} User Sessions" + + def _add_session_to_table(user_session: UserSession): + session_type = "local" + remote_ip = "" + if isinstance(user_session, RemoteUserSession): + session_type = "remote" + remote_ip = str(user_session.remote_ip_address) + data = [ + user_session.uuid, + user_session.user.username, + session_type, + remote_ip, + user_session.start_step, + user_session.last_active_step, + user_session.end_step if user_session.end_step else "", + ] + if not include_session_id: + data = data[1:] + table.add_row(data) + + if self.local_session is not None: + _add_session_to_table(self.local_session) + + for user_session in self.remote_sessions.values(): + _add_session_to_table(user_session) + + if include_historic: + for user_session in self.historic_sessions: + _add_session_to_table(user_session) + + print(table.get_string(sortby="Step Last Active", reversesort=True)) + + def describe_state(self) -> Dict: + return super().describe_state() + + @property + def _user_manager(self) -> UserManager: + return self.software_manager.software["UserManager"] # noqa + + def pre_timestep(self, timestep: int) -> None: + """Apply any pre-timestep logic that helps make sure we have the correct observations.""" + self.current_timestep = timestep + if self.local_session: + if self.local_session.last_active_step + self.local_session_timeout_steps <= timestep: + self._timeout_session(self.local_session) + + def _timeout_session(self, session: UserSession) -> None: + session.end_step = self.current_timestep + session_identity = session.user.username + if session.local: + self.local_session = None + session_type = "Local" + else: + self.remote_sessions.pop(session.uuid) + session_type = "Remote" + session_identity = f"{session_identity} {session.remote_ip_address}" + + self.sys_log.info(f"{self.name}: {session_type} {session_identity} session timeout due to inactivity") + + def login(self, username: str, password: str) -> Optional[str]: + if not self._can_perform_action(): + return None + user = self._user_manager.authenticate_user(username=username, password=password) + if user: + self.logout() + self.local_session = UserSession.create(user=user, timestep=self.current_timestep) + self.sys_log.info(f"{self.name}: User {user.username} logged in") + return self.local_session.uuid + else: + self.sys_log.info(f"{self.name}: Incorrect username or password") + + def logout(self): + if not self._can_perform_action(): + return False + if self.local_session: + session = self.local_session + session.end_step = self.current_timestep + self.historic_sessions.append(session) + self.local_session = None + self.sys_log.info(f"{self.name}: User {session.user.username} logged out") + + @property + def local_user_logged_in(self): + return self.local_session is not None + + class Node(SimComponent): """ A basic Node class that represents a node on the network. @@ -889,16 +1212,24 @@ class Node(SimComponent): super().__init__(**kwargs) self.session_manager.node = self self.session_manager.software_manager = self.software_manager - self.software_manager.install(UserSessionManager) + self.software_manager.install(UserSessionManager, node=self) self.software_manager.install(UserManager) + self.user_manager.add_user(username="admin", password="admin", is_admin=True, bypass_can_perform_action=True) + self._install_system_software() - # @property - # def user_manager(self) -> UserManager: - # return self.software_manager.software["UserManager"] # noqa - # - # @property - # def _user_session_manager(self) -> UserSessionManager: - # return self.software_manager.software["UserSessionManager"] # noqa + @property + def user_manager(self) -> UserManager: + return self.software_manager.software["UserManager"] # noqa + + @property + def user_session_manager(self) -> UserSessionManager: + return self.software_manager.software["UserSessionManager"] # noqa + + def login(self, username: str, password: str) -> Optional[str]: + return self.user_session_manager.login(username, password) + + def logout(self): + return self.user_session_manager.logout() def ip_is_network_interface(self, ip_address: IPv4Address, enabled_only: bool = False) -> bool: """ @@ -1434,10 +1765,14 @@ class Node(SimComponent): :param pings: The number of pings to attempt, default is 4. :return: True if the ping is successful, otherwise False. """ + if not self.user_session_manager.local_user_logged_in: + return False if not isinstance(target_ip_address, IPv4Address): target_ip_address = IPv4Address(target_ip_address) if self.software_manager.icmp: + print("yes") return self.software_manager.icmp.ping(target_ip_address, pings) + print("no icmp") return False @abstractmethod diff --git a/src/primaite/simulator/network/hardware/nodes/host/host_node.py b/src/primaite/simulator/network/hardware/nodes/host/host_node.py index 80f80a04..aac57e95 100644 --- a/src/primaite/simulator/network/hardware/nodes/host/host_node.py +++ b/src/primaite/simulator/network/hardware/nodes/host/host_node.py @@ -11,8 +11,6 @@ from primaite.simulator.network.transmission.data_link_layer import Frame from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.nmap import NMAP from primaite.simulator.system.applications.web_browser import WebBrowser -from primaite.simulator.system.services.access.user_manager import UserManager -from primaite.simulator.system.services.access.user_session_manager import UserSessionManager from primaite.simulator.system.services.arp.arp import ARP, ARPPacket from primaite.simulator.system.services.dns.dns_client import DNSClient from primaite.simulator.system.services.icmp.icmp import ICMP @@ -318,10 +316,9 @@ class HostNode(Node): network_interface: Dict[int, NIC] = {} "The NICs on the node by port id." - def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, username: str, password: str, **kwargs): + def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs): super().__init__(**kwargs) self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask)) - self.user_manager.add_user(username=username, password=password, is_admin=True, bypass_can_perform_action=True) @property def nmap(self) -> Optional[NMAP]: diff --git a/src/primaite/simulator/system/core/software_manager.py b/src/primaite/simulator/system/core/software_manager.py index e2266c2d..c52e60ae 100644 --- a/src/primaite/simulator/system/core/software_manager.py +++ b/src/primaite/simulator/system/core/software_manager.py @@ -104,7 +104,7 @@ class SoftwareManager: return True return False - def install(self, software_class: Type[IOSoftwareClass]): + def install(self, software_class: Type[IOSoftwareClass], **install_kwargs) -> None: """ Install an Application or Service. @@ -116,7 +116,11 @@ class SoftwareManager: self.sys_log.warning(f"Cannot install {software_class} as it is already installed") return software = software_class( - software_manager=self, sys_log=self.sys_log, file_system=self.file_system, dns_server=self.dns_server + software_manager=self, + sys_log=self.sys_log, + file_system=self.file_system, + dns_server=self.dns_server, + **install_kwargs, ) if isinstance(software, Application): software.install() diff --git a/src/primaite/simulator/system/services/access/user_manager.py b/src/primaite/simulator/system/services/access/user_manager.py index 09f8950e..be6c00e7 100644 --- a/src/primaite/simulator/system/services/access/user_manager.py +++ b/src/primaite/simulator/system/services/access/user_manager.py @@ -1,186 +1 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK -from typing import Dict, Optional - -from prettytable import MARKDOWN, PrettyTable -from pydantic import Field - -from primaite.simulator.core import SimComponent -from primaite.simulator.network.transmission.network_layer import IPProtocol -from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.services.service import Service - - -class User(SimComponent): - """ - Represents a user in the PrimAITE system. - - :param username: The username of the user - :param password: The password of the user - :param disabled: Boolean flag indicating whether the user is disabled - :param is_admin: Boolean flag indicating whether the user has admin privileges - """ - - username: str - password: str - disabled: bool = False - is_admin: bool = False - - def describe_state(self) -> Dict: - """ - Returns a dictionary representing the current state of the user. - - :return: A dict containing the state of the user - """ - return self.model_dump() - - -class UserManager(Service): - """ - Manages users within the PrimAITE system, handling creation, authentication, and administration. - - :param users: A dictionary of all users by their usernames - :param admins: A dictionary of admin users by their usernames - :param disabled_admins: A dictionary of currently disabled admin users by their usernames - """ - - users: Dict[str, User] = Field(default_factory=dict) - admins: Dict[str, User] = Field(default_factory=dict) - disabled_admins: Dict[str, User] = Field(default_factory=dict) - - def __init__(self, **kwargs): - """ - Initializes a UserManager instanc. - - :param username: The username for the default admin user - :param password: The password for the default admin user - """ - kwargs["name"] = "UserManager" - kwargs["port"] = Port.NONE - kwargs["protocol"] = IPProtocol.NONE - super().__init__(**kwargs) - self.start() - - def describe_state(self) -> Dict: - """ - Returns the state of the UserManager along with the number of users and admins. - - :return: A dict containing detailed state information - """ - state = super().describe_state() - state.update({"total_users": len(self.users), "total_admins": len(self.admins) + len(self.disabled_admins)}) - return state - - def show(self, markdown: bool = False): - """ - Display the Users. - - :param markdown: Whether to display the table in Markdown format or not. Default is `False`. - """ - table = PrettyTable(["Username", "Admin", "Enabled"]) - if markdown: - table.set_style(MARKDOWN) - table.align = "l" - table.title = f"{self.sys_log.hostname} User Manager)" - for user in self.users.values(): - table.add_row([user.username, user.is_admin, user.disabled]) - print(table.get_string(sortby="Username")) - - def _is_last_admin(self, username: str) -> bool: - return username in self.admins and len(self.admins) == 1 - - def add_user( - self, username: str, password: str, is_admin: bool = False, bypass_can_perform_action: bool = False - ) -> bool: - """ - Adds a new user to the system. - - :param username: The username for the new user - :param password: The password for the new user - :param is_admin: Flag indicating if the new user is an admin - :return: True if user was successfully added, False otherwise - """ - if not bypass_can_perform_action and not self._can_perform_action(): - return False - if username in self.users: - return False - user = User(username=username, password=password, is_admin=is_admin) - self.users[username] = user - if is_admin: - self.admins[username] = user - self.sys_log.info(f"{self.name}: Added new {'admin' if is_admin else 'user'}: {username}") - return True - - def authenticate_user(self, username: str, password: str) -> Optional[User]: - """ - Authenticates a user's login attempt. - - :param username: The username of the user trying to log in - :param password: The password provided by the user - :return: The User object if authentication is successful, None otherwise - """ - if not self._can_perform_action(): - return None - user = self.users.get(username) - if user and not user.disabled and user.password == password: - self.sys_log.info(f"{self.name}: User authenticated: {username}") - return user - self.sys_log.info(f"{self.name}: Authentication failed for: {username}") - return None - - def change_user_password(self, username: str, current_password: str, new_password: str) -> bool: - """ - Changes a user's password. - - :param username: The username of the user changing their password - :param current_password: The current password of the user - :param new_password: The new password for the user - :return: True if the password was changed successfully, False otherwise - """ - if not self._can_perform_action(): - return False - user = self.users.get(username) - if user and user.password == current_password: - user.password = new_password - self.sys_log.info(f"{self.name}: Password changed for {username}") - return True - self.sys_log.info(f"{self.name}: Password change failed for {username}") - return False - - def disable_user(self, username: str) -> bool: - """ - Disables a user account, preventing them from logging in. - - :param username: The username of the user to disable - :return: True if the user was disabled successfully, False otherwise - """ - if not self._can_perform_action(): - return False - if username in self.users and not self.users[username].disabled: - if self._is_last_admin(username): - self.sys_log.info(f"{self.name}: Cannot disable User {username} as they are the only enabled admin") - return False - self.users[username].disabled = True - self.sys_log.info(f"{self.name}: User disabled: {username}") - if username in self.admins: - self.disabled_admins[username] = self.admins.pop(username) - return True - self.sys_log.info(f"{self.name}: Failed to disable user: {username}") - return False - - def enable_user(self, username: str) -> bool: - """ - Enables a previously disabled user account. - - :param username: The username of the user to enable - :return: True if the user was enabled successfully, False otherwise - """ - if not self._can_perform_action(): - return False - if username in self.users and self.users[username].disabled: - self.users[username].disabled = False - self.sys_log.info(f"{self.name}: User enabled: {username}") - if username in self.disabled_admins: - self.admins[username] = self.disabled_admins.pop(username) - return True - self.sys_log.info(f"{self.name}: Failed to enable user: {username}") - return False diff --git a/src/primaite/simulator/system/services/access/user_session_manager.py b/src/primaite/simulator/system/services/access/user_session_manager.py index 03d2dd93..be6c00e7 100644 --- a/src/primaite/simulator/system/services/access/user_session_manager.py +++ b/src/primaite/simulator/system/services/access/user_session_manager.py @@ -1,98 +1 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK -from __future__ import annotations - -from datetime import datetime, timedelta -from typing import Dict, List, Optional -from uuid import uuid4 - -from pydantic import BaseModel, Field - -from primaite.simulator.core import SimComponent -from primaite.simulator.network.transmission.network_layer import IPProtocol -from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.services.access.user_manager import User, UserManager -from primaite.simulator.system.services.service import Service -from primaite.utils.validators import IPV4Address - - -class UserSession(SimComponent): - user: User - start_step: int - last_active_step: int - end_step: Optional[int] = None - local: bool = True - - @classmethod - def create(cls, user: User, timestep: int) -> UserSession: - return UserSession(user=user, start_step=timestep, last_active_step=timestep) - def describe_state(self) -> Dict: - return self.model_dump() - - -class RemoteUserSession(UserSession): - remote_ip_address: IPV4Address - local: bool = False - - def describe_state(self) -> Dict: - state = super().describe_state() - state["remote_ip_address"] = str(self.remote_ip_address) - return state - - -class UserSessionManager(BaseModel): - node: - local_session: Optional[UserSession] = None - remote_sessions: Dict[str, RemoteUserSession] = Field(default_factory=dict) - historic_sessions: List[UserSession] = Field(default_factory=list) - - local_session_timeout_steps: int = 30 - remote_session_timeout_steps: int = 5 - max_remote_sessions: int = 3 - - current_timestep: int = 0 - - @property - def _user_manager(self) -> UserManager: - return self.software_manager.software["UserManager"] # noqa - - def pre_timestep(self, timestep: int) -> None: - """Apply any pre-timestep logic that helps make sure we have the correct observations.""" - self.current_timestep = timestep - if self.local_session: - if self.local_session.last_active_step + self.local_session_timeout_steps <= timestep: - self._timeout_session(self.local_session) - - def _timeout_session(self, session: UserSession) -> None: - session.end_step = self.current_timestep - session_identity = session.user.username - if session.local: - self.local_session = None - session_type = "Local" - else: - self.remote_sessions.pop(session.uuid) - session_type = "Remote" - session_identity = f"{session_identity} {session.remote_ip_address}" - - self.sys_log.info(f"{self.name}: {session_type} {session_identity} session timeout due to inactivity") - - def login(self, username: str, password: str) -> Optional[str]: - if not self._can_perform_action(): - return None - user = self._user_manager.authenticate_user(username=username, password=password) - if user: - self.logout() - self.local_session = UserSession.create(user=user, timestep=self.current_timestep) - self.sys_log.info(f"{self.name}: User {user.username} logged in") - return self.local_session.uuid - else: - self.sys_log.info(f"{self.name}: Incorrect username or password") - - def logout(self): - if not self._can_perform_action(): - return False - if self.local_session: - session = self.local_session - session.end_step = self.current_timestep - self.historic_sessions.append(session) - self.local_session = None - self.sys_log.info(f"{self.name}: User {session.user.username} logged out") diff --git a/src/primaite/simulator/system/services/service.py b/src/primaite/simulator/system/services/service.py index 4227175b..5adea6e7 100644 --- a/src/primaite/simulator/system/services/service.py +++ b/src/primaite/simulator/system/services/service.py @@ -46,7 +46,7 @@ class Service(IOSoftware): restart_countdown: Optional[int] = None "If currently restarting, how many timesteps remain until the restart is finished." - def __init__(self, **kwargs):c + def __init__(self, **kwargs): super().__init__(**kwargs) def _can_perform_action(self) -> bool: diff --git a/tests/integration_tests/system/test_local_accounts.py b/tests/integration_tests/system/test_local_accounts.py new file mode 100644 index 00000000..dbdbf857 --- /dev/null +++ b/tests/integration_tests/system/test_local_accounts.py @@ -0,0 +1,37 @@ +# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from primaite.simulator.network.container import Network +from primaite.simulator.network.hardware.nodes.host.computer import Computer +from primaite.simulator.network.hardware.nodes.host.server import Server + + +def test_local_accounts_ping_temp(): + network = Network() + + # Create Computer + computer = Computer( + hostname="computer", + ip_address="192.168.1.2", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0, + ) + computer.power_on() + + # Create Server + server = Server( + hostname="server", + ip_address="192.168.1.3", + subnet_mask="255.255.255.0", + default_gateway="192.168.1.1", + start_up_duration=0, + ) + server.power_on() + + # Connect Computer and Server + network.connect(computer.network_interface[1], server.network_interface[1]) + + assert not computer.ping(server.network_interface[1].ip_address) + + computer.user_session_manager.login(username="admin", password="admin") + + assert computer.ping(server.network_interface[1].ip_address)