diff --git a/src/primaite/simulator/network/protocols/ssh.py b/src/primaite/simulator/network/protocols/ssh.py index 8671a1c8..495a2a2b 100644 --- a/src/primaite/simulator/network/protocols/ssh.py +++ b/src/primaite/simulator/network/protocols/ssh.py @@ -22,7 +22,7 @@ class SSHTransportMessage(IntEnum): """Indicates User Authentication failed.""" SSH_MSG_USERAUTH_SUCCESS = 52 - """Indicates User Authentication failed was successful.""" + """Indicates User Authentication was successful.""" SSH_MSG_SERVICE_REQUEST = 24 """Requests a service - such as executing a command.""" diff --git a/src/primaite/simulator/system/services/terminal/terminal.py b/src/primaite/simulator/system/services/terminal/terminal.py index 6df21618..998238a9 100644 --- a/src/primaite/simulator/system/services/terminal/terminal.py +++ b/src/primaite/simulator/system/services/terminal/terminal.py @@ -43,6 +43,17 @@ class TerminalClientConnection(BaseModel): _connection_uuid: str = None """Connection UUID""" + @property + def is_local(self) -> bool: + """Indicates if connection is remote or local. + + Returns True if local, False if remote. + """ + for interface in self.parent_node.network_interface: + if self.dest_ip_address == self.parent_node.network_interface[interface].ip_address: + return True + return False + @property def client(self) -> Optional[Terminal]: """The Terminal that holds this connection.""" @@ -57,9 +68,6 @@ class TerminalClientConnection(BaseModel): class Terminal(Service): """Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH.""" - is_connected: bool = False - "Boolean Value for whether connected" - connection_uuid: Optional[str] = None "Uuid for connection requests" @@ -69,7 +77,8 @@ class Terminal(Service): health_state_actual: SoftwareHealthState = SoftwareHealthState.GOOD "Service Health State" - remote_connection: Dict[str, TerminalClientConnection] = {} + _connections: Dict[str, TerminalClientConnection] = {} + "List of active connections held on this terminal." def __init__(self, **kwargs): kwargs["name"] = "Terminal" @@ -95,13 +104,13 @@ class Terminal(Service): :param markdown: Whether to display the table in Markdown format or not. Default is `False`. """ - table = PrettyTable(["Connection ID", "IP_Address", "Active"]) + table = PrettyTable(["Connection ID", "IP_Address", "Active", "Local"]) if markdown: table.set_style(MARKDOWN) table.align = "l" - table.title = f"{self.sys_log.hostname} {self.name} Remote Connections" - for connection_id, connection in self.remote_connection.items(): - table.add_row([connection_id, connection.dest_ip_address, connection.is_active]) + table.title = f"{self.sys_log.hostname} {self.name} Connections" + for connection_id, connection in self._connections.items(): + table.add_row([connection_id, connection.dest_ip_address, connection.is_active, connection.is_local]) print(table.get_string(sortby="Connection ID")) def _init_request_manager(self) -> RequestManager: @@ -182,11 +191,18 @@ class Terminal(Service): """Message that is reported when a request is rejected by this validator.""" return "Cannot perform request on terminal as not logged in." + def _add_new_connection(self, connection_uuid: str, dest_ip_address: IPv4Address): + """Create a new connection object and amend to list of active connections.""" + self._connections[connection_uuid] = TerminalClientConnection( + parent_node=self.software_manager.node, + dest_ip_address=dest_ip_address, + connection_uuid=connection_uuid, + ) + def login(self, username: str, password: str, ip_address: Optional[IPv4Address] = None) -> bool: """Process User request to login to Terminal. - If ip_address is passed, login will attempt a remote login to the terminal - + If ip_address is passed, login will attempt a remote login to the node at that address. :param username: The username credential. :param password: The user password component of credentials. :param dest_ip_address: The IP address of the node we want to connect to. @@ -198,8 +214,6 @@ class Terminal(Service): if ip_address: # if ip_address has been provided, we assume we are logging in to a remote terminal. - if ip_address == self.parent.network_interface[1].ip_address: - return self._process_local_login(username=username, password=password) return self._send_remote_login(username=username, password=password, ip_address=ip_address) return self._process_local_login(username=username, password=password) @@ -207,11 +221,14 @@ class Terminal(Service): def _process_local_login(self, username: str, password: str) -> bool: """Local session login to terminal.""" # TODO: Un-comment this when UserSessionManager is merged. - # self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password) - self.connection_uuid = str(uuid4()) - self.is_connected = True - if self.connection_uuid: + # connection_uuid = self.parent.UserSessionManager.login(username=username, password=password) + connection_uuid = str(uuid4()) + if connection_uuid: self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}") + # Add new local session to list of connections + self._add_new_connection( + connection_uuid=connection_uuid, dest_ip_address=self.parent.network_interface[1].ip_address + ) return True else: self.sys_log.warning("Login failed, incorrect Username or Password") @@ -246,11 +263,10 @@ class Terminal(Service): # TODO: Un-comment this when UserSessionManager is merged. # connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password) connection_uuid = str(uuid4()) - self.is_connected = True if connection_uuid: # Send uuid to remote self.sys_log.info( - f"Remote login authorised, connection ID {self.connection_uuid} for " + f"Remote login authorised, connection ID {connection_uuid} for " f"{username} on {payload.sender_ip_address}" ) transport_message: SSHTransportMessage = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS @@ -262,12 +278,7 @@ class Terminal(Service): sender_ip_address=self.parent.network_interface[1].ip_address, target_ip_address=payload.sender_ip_address, ) - - self.remote_connection[connection_uuid] = TerminalClientConnection( - parent_node=self.software_manager.node, - dest_ip_address=payload.sender_ip_address, - connection_uuid=connection_uuid, - ) + self._add_new_connection(connection_uuid=connection_uuid, dest_ip_address=payload.sender_ip_address) self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address) return True @@ -280,7 +291,7 @@ class Terminal(Service): """Receive Payload and process for a response. :param payload: The message contents received. - :return: True if successfull, else False. + :return: True if successful, else False. """ self.sys_log.debug(f"Received payload: {payload}") @@ -304,7 +315,6 @@ class Terminal(Service): elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS: self.sys_log.info(f"Login Successful, connection ID is {payload.connection_uuid}") self.connection_uuid = payload.connection_uuid - self.is_connected = True return True elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST: @@ -322,14 +332,15 @@ class Terminal(Service): def _disconnect(self, dest_ip_address: IPv4Address) -> bool: """Disconnect from the remote.""" - if not self.is_connected: - self.sys_log.warning("Not currently connected to remote") - return False - - if not self.remote_connection: + if not self._connections: self.sys_log.warning("No remote connection present") return False + # TODO: This should probably be done entirely by connection uuid and not IP_address. + for connection in self._connections: + if dest_ip_address == self._connections[connection].dest_ip_address: + self._connections.pop(connection) + software_manager: SoftwareManager = self.software_manager software_manager.send_payload_to_session_manager( payload={"type": "disconnect", "connection_id": self.connection_uuid}, @@ -347,7 +358,6 @@ class Terminal(Service): :return: True if successful, False otherwise. """ self._disconnect(dest_ip_address=dest_ip_address) - self.is_connected = False def send( self,