From 0fe61576c768839429a4802ba5ec89b4ac8f48ba Mon Sep 17 00:00:00 2001 From: Charlie Crane Date: Fri, 2 Aug 2024 09:13:31 +0100 Subject: [PATCH] #2706 - Removed source and target ip_address attributes from the SSHPacket Class. Terminal now uses session_id to send login outcome. No more network_interface[1].ip_address. --- .../simulator/network/protocols/ssh.py | 7 -- .../system/services/terminal/terminal.py | 112 +++++++++--------- 2 files changed, 53 insertions(+), 66 deletions(-) diff --git a/src/primaite/simulator/network/protocols/ssh.py b/src/primaite/simulator/network/protocols/ssh.py index 495a2a2b..4ec043b8 100644 --- a/src/primaite/simulator/network/protocols/ssh.py +++ b/src/primaite/simulator/network/protocols/ssh.py @@ -1,7 +1,6 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK from enum import IntEnum -from ipaddress import IPv4Address from typing import Optional from primaite.interface.request import RequestResponse @@ -68,12 +67,6 @@ class SSHUserCredentials(DataPacket): class SSHPacket(DataPacket): """Represents an SSHPacket.""" - sender_ip_address: IPv4Address - """Sender IP Address""" - - target_ip_address: IPv4Address - """Target IP Address""" - transport_message: SSHTransportMessage """Message Transport Type""" diff --git a/src/primaite/simulator/system/services/terminal/terminal.py b/src/primaite/simulator/system/services/terminal/terminal.py index 998238a9..192f0551 100644 --- a/src/primaite/simulator/system/services/terminal/terminal.py +++ b/src/primaite/simulator/system/services/terminal/terminal.py @@ -28,32 +28,21 @@ class TerminalClientConnection(BaseModel): """ TerminalClientConnection Class. - This class is used to record current remote User Connections to the Terminal class. + This class is used to record current User Connections to the Terminal class. """ parent_node: Node # Technically should be HostNode but this causes circular import error. """The parent Node that this connection was created on.""" - is_active: bool = True - """Flag to state whether the connection is still active or not.""" - dest_ip_address: IPv4Address = None """Destination IP address of connection""" + session_id: str = None + """Session ID that connection is linked to""" + _connection_uuid: str = None """Connection UUID""" - @property - def is_local(self) -> bool: - """Indicates if connection is remote or local. - - Returns True if local, False if remote. - """ - for interface in self.parent_node.network_interface: - if self.dest_ip_address == self.parent_node.network_interface[interface].ip_address: - return True - return False - @property def client(self) -> Optional[Terminal]: """The Terminal that holds this connection.""" @@ -68,9 +57,6 @@ class TerminalClientConnection(BaseModel): class Terminal(Service): """Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH.""" - connection_uuid: Optional[str] = None - "Uuid for connection requests" - operating_state: ServiceOperatingState = ServiceOperatingState.RUNNING "Initial Operating State" @@ -104,13 +90,13 @@ class Terminal(Service): :param markdown: Whether to display the table in Markdown format or not. Default is `False`. """ - table = PrettyTable(["Connection ID", "IP_Address", "Active", "Local"]) + table = PrettyTable(["Connection ID", "Session_ID"]) if markdown: table.set_style(MARKDOWN) table.align = "l" table.title = f"{self.sys_log.hostname} {self.name} Connections" for connection_id, connection in self._connections.items(): - table.add_row([connection_id, connection.dest_ip_address, connection.is_active, connection.is_local]) + table.add_row([connection_id, connection.session_id]) print(table.get_string(sortby="Connection ID")) def _init_request_manager(self) -> RequestManager: @@ -145,11 +131,12 @@ class Terminal(Service): self.execute(command) return RequestResponse(status="success", data={}) - def _logoff() -> RequestResponse: + def _logoff(request: List[Any]) -> RequestResponse: """Logoff from connection.""" + connection_uuid = request[0] # TODO: Uncomment this when UserSessionManager merged. - # self.parent.UserSessionManager.logoff(self.connection_uuid) - self.disconnect(self.connection_uuid) + # self.parent.UserSessionManager.logoff(connection_uuid) + self.disconnect(connection_uuid) return RequestResponse(status="success", data={}) @@ -191,12 +178,12 @@ class Terminal(Service): """Message that is reported when a request is rejected by this validator.""" return "Cannot perform request on terminal as not logged in." - def _add_new_connection(self, connection_uuid: str, dest_ip_address: IPv4Address): + def _add_new_connection(self, connection_uuid: str, session_id: str): """Create a new connection object and amend to list of active connections.""" self._connections[connection_uuid] = TerminalClientConnection( parent_node=self.software_manager.node, - dest_ip_address=dest_ip_address, connection_uuid=connection_uuid, + session_id=session_id, ) def login(self, username: str, password: str, ip_address: Optional[IPv4Address] = None) -> bool: @@ -219,23 +206,31 @@ class Terminal(Service): return self._process_local_login(username=username, password=password) def _process_local_login(self, username: str, password: str) -> bool: - """Local session login to terminal.""" + """Local session login to terminal. + + :param username: Username for login. + :param password: Password for login. + :return: boolean, True if successful, else False + """ # TODO: Un-comment this when UserSessionManager is merged. # connection_uuid = self.parent.UserSessionManager.login(username=username, password=password) connection_uuid = str(uuid4()) if connection_uuid: - self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}") + self.sys_log.info(f"Login request authorised, connection uuid: {connection_uuid}") # Add new local session to list of connections - self._add_new_connection( - connection_uuid=connection_uuid, dest_ip_address=self.parent.network_interface[1].ip_address - ) + self._add_new_connection(connection_uuid=connection_uuid) return True else: self.sys_log.warning("Login failed, incorrect Username or Password") return False def _send_remote_login(self, username: str, password: str, ip_address: IPv4Address) -> bool: - """Attempt to login to a remote terminal.""" + """Attempt to login to a remote terminal. + + :param username: username for login. + :param password: password for login. + :ip_address: IP address of the target node for login. + """ transport_message: SSHTransportMessage = SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST connection_message: SSHConnectionMessage = SSHConnectionMessage.SSH_MSG_CHANNEL_DATA user_account: SSHUserCredentials = SSHUserCredentials(username=username, password=password) @@ -244,14 +239,12 @@ class Terminal(Service): transport_message=transport_message, connection_message=connection_message, user_account=user_account, - target_ip_address=ip_address, - sender_ip_address=self.parent.network_interface[1].ip_address, ) self.sys_log.info(f"Sending remote login request to {ip_address}") return self.send(payload=payload, dest_ip_address=ip_address) - def _process_remote_login(self, payload: SSHPacket) -> bool: + def _process_remote_login(self, payload: SSHPacket, session_id: str) -> bool: """Processes a remote terminal requesting to login to this terminal. :param payload: The SSH Payload Packet. @@ -266,8 +259,7 @@ class Terminal(Service): if connection_uuid: # Send uuid to remote self.sys_log.info( - f"Remote login authorised, connection ID {connection_uuid} for " - f"{username} on {payload.sender_ip_address}" + f"Remote login authorised, connection ID {connection_uuid} for " f"{username} in session {session_id}" ) transport_message: SSHTransportMessage = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS connection_message: SSHConnectionMessage = SSHConnectionMessage.SSH_MSG_CHANNEL_DATA @@ -275,19 +267,17 @@ class Terminal(Service): transport_message=transport_message, connection_message=connection_message, connection_uuid=connection_uuid, - sender_ip_address=self.parent.network_interface[1].ip_address, - target_ip_address=payload.sender_ip_address, ) - self._add_new_connection(connection_uuid=connection_uuid, dest_ip_address=payload.sender_ip_address) + self._add_new_connection(connection_uuid=connection_uuid, session_id=session_id) - self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address) + self.send(payload=return_payload, session_id=session_id) return True else: # UserSessionManager has returned None self.sys_log.warning("Login failed, incorrect Username or Password") return False - def receive(self, payload: SSHPacket, **kwargs) -> bool: + def receive(self, payload: SSHPacket, session_id: str, **kwargs) -> bool: """Receive Payload and process for a response. :param payload: The message contents received. @@ -310,11 +300,10 @@ class Terminal(Service): self.sys_log.debug(f"Disconnecting {connection_id}") elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST: - return self._process_remote_login(payload=payload) + return self._process_remote_login(payload=payload, session_id=session_id) elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS: self.sys_log.info(f"Login Successful, connection ID is {payload.connection_uuid}") - self.connection_uuid = payload.connection_uuid return True elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST: @@ -330,16 +319,18 @@ class Terminal(Service): """Execute a passed ssh command via the request manager.""" return self.parent.apply_request(command) - def _disconnect(self, dest_ip_address: IPv4Address) -> bool: - """Disconnect from the remote.""" + def _disconnect(self, connection_uuid: str) -> bool: + """Disconnect from the remote. + + :param connection_uuid: Connection ID that we want to disconnect. + :return True if successful, False otherwise. + """ if not self._connections: self.sys_log.warning("No remote connection present") return False - # TODO: This should probably be done entirely by connection uuid and not IP_address. - for connection in self._connections: - if dest_ip_address == self._connections[connection].dest_ip_address: - self._connections.pop(connection) + dest_ip_address = self._connections[connection_uuid].dest_ip_address + self._connections.pop(connection_uuid) software_manager: SoftwareManager = self.software_manager software_manager.send_payload_to_session_manager( @@ -347,22 +338,23 @@ class Terminal(Service): dest_ip_address=dest_ip_address, dest_port=self.port, ) - self.connection_uuid = None - self.sys_log.info(f"{self.name}: Disconnected {self.connection_uuid}") + self.sys_log.info(f"{self.name}: Disconnected {connection_uuid}") return True - def disconnect(self, dest_ip_address: IPv4Address) -> bool: - """Disconnect from remote connection. + def disconnect(self, connection_uuid: Optional[str]) -> bool: + """Disconnect the terminal. - :param dest_ip_address: The IP address fo the connection we are terminating. + If no connection id has been supplied, disconnects the first connection. + :param connection_uuid: Connection ID that we want to disconnect. :return: True if successful, False otherwise. """ - self._disconnect(dest_ip_address=dest_ip_address) + if not connection_uuid: + connection_uuid = next(iter(self._connections)) + + return self._disconnect(connection_uuid=connection_uuid) def send( - self, - payload: SSHPacket, - dest_ip_address: IPv4Address, + self, payload: SSHPacket, dest_ip_address: Optional[IPv4Address] = None, session_id: Optional[str] = None ) -> bool: """ Send a payload out from the Terminal. @@ -374,4 +366,6 @@ class Terminal(Service): self.sys_log.warning(f"Cannot send commands when Operating state is {self.operating_state}!") return False self.sys_log.debug(f"Sending payload: {payload}") - return super().send(payload=payload, dest_ip_address=dest_ip_address, dest_port=self.port) + return super().send( + payload=payload, dest_ip_address=dest_ip_address, dest_port=self.port, session_id=session_id + )