#2706 - Changed how Terminal Class handles its connections. Terminal now has a list of TerminalClientConnection objects that holds all active connections. Corrected a typo in ssh.py

This commit is contained in:
Charlie Crane
2024-08-01 12:34:21 +01:00
parent 5ef9e78a44
commit 19d7774440
2 changed files with 43 additions and 33 deletions

View File

@@ -22,7 +22,7 @@ class SSHTransportMessage(IntEnum):
"""Indicates User Authentication failed."""
SSH_MSG_USERAUTH_SUCCESS = 52
"""Indicates User Authentication failed was successful."""
"""Indicates User Authentication was successful."""
SSH_MSG_SERVICE_REQUEST = 24
"""Requests a service - such as executing a command."""

View File

@@ -43,6 +43,17 @@ class TerminalClientConnection(BaseModel):
_connection_uuid: str = None
"""Connection UUID"""
@property
def is_local(self) -> bool:
"""Indicates if connection is remote or local.
Returns True if local, False if remote.
"""
for interface in self.parent_node.network_interface:
if self.dest_ip_address == self.parent_node.network_interface[interface].ip_address:
return True
return False
@property
def client(self) -> Optional[Terminal]:
"""The Terminal that holds this connection."""
@@ -57,9 +68,6 @@ class TerminalClientConnection(BaseModel):
class Terminal(Service):
"""Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH."""
is_connected: bool = False
"Boolean Value for whether connected"
connection_uuid: Optional[str] = None
"Uuid for connection requests"
@@ -69,7 +77,8 @@ class Terminal(Service):
health_state_actual: SoftwareHealthState = SoftwareHealthState.GOOD
"Service Health State"
remote_connection: Dict[str, TerminalClientConnection] = {}
_connections: Dict[str, TerminalClientConnection] = {}
"List of active connections held on this terminal."
def __init__(self, **kwargs):
kwargs["name"] = "Terminal"
@@ -95,13 +104,13 @@ class Terminal(Service):
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["Connection ID", "IP_Address", "Active"])
table = PrettyTable(["Connection ID", "IP_Address", "Active", "Local"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} {self.name} Remote Connections"
for connection_id, connection in self.remote_connection.items():
table.add_row([connection_id, connection.dest_ip_address, connection.is_active])
table.title = f"{self.sys_log.hostname} {self.name} Connections"
for connection_id, connection in self._connections.items():
table.add_row([connection_id, connection.dest_ip_address, connection.is_active, connection.is_local])
print(table.get_string(sortby="Connection ID"))
def _init_request_manager(self) -> RequestManager:
@@ -182,11 +191,18 @@ class Terminal(Service):
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on terminal as not logged in."
def _add_new_connection(self, connection_uuid: str, dest_ip_address: IPv4Address):
"""Create a new connection object and amend to list of active connections."""
self._connections[connection_uuid] = TerminalClientConnection(
parent_node=self.software_manager.node,
dest_ip_address=dest_ip_address,
connection_uuid=connection_uuid,
)
def login(self, username: str, password: str, ip_address: Optional[IPv4Address] = None) -> bool:
"""Process User request to login to Terminal.
If ip_address is passed, login will attempt a remote login to the terminal
If ip_address is passed, login will attempt a remote login to the node at that address.
:param username: The username credential.
:param password: The user password component of credentials.
:param dest_ip_address: The IP address of the node we want to connect to.
@@ -198,8 +214,6 @@ class Terminal(Service):
if ip_address:
# if ip_address has been provided, we assume we are logging in to a remote terminal.
if ip_address == self.parent.network_interface[1].ip_address:
return self._process_local_login(username=username, password=password)
return self._send_remote_login(username=username, password=password, ip_address=ip_address)
return self._process_local_login(username=username, password=password)
@@ -207,11 +221,14 @@ class Terminal(Service):
def _process_local_login(self, username: str, password: str) -> bool:
"""Local session login to terminal."""
# TODO: Un-comment this when UserSessionManager is merged.
# self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
self.connection_uuid = str(uuid4())
self.is_connected = True
if self.connection_uuid:
# connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
connection_uuid = str(uuid4())
if connection_uuid:
self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}")
# Add new local session to list of connections
self._add_new_connection(
connection_uuid=connection_uuid, dest_ip_address=self.parent.network_interface[1].ip_address
)
return True
else:
self.sys_log.warning("Login failed, incorrect Username or Password")
@@ -246,11 +263,10 @@ class Terminal(Service):
# TODO: Un-comment this when UserSessionManager is merged.
# connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password)
connection_uuid = str(uuid4())
self.is_connected = True
if connection_uuid:
# Send uuid to remote
self.sys_log.info(
f"Remote login authorised, connection ID {self.connection_uuid} for "
f"Remote login authorised, connection ID {connection_uuid} for "
f"{username} on {payload.sender_ip_address}"
)
transport_message: SSHTransportMessage = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS
@@ -262,12 +278,7 @@ class Terminal(Service):
sender_ip_address=self.parent.network_interface[1].ip_address,
target_ip_address=payload.sender_ip_address,
)
self.remote_connection[connection_uuid] = TerminalClientConnection(
parent_node=self.software_manager.node,
dest_ip_address=payload.sender_ip_address,
connection_uuid=connection_uuid,
)
self._add_new_connection(connection_uuid=connection_uuid, dest_ip_address=payload.sender_ip_address)
self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address)
return True
@@ -280,7 +291,7 @@ class Terminal(Service):
"""Receive Payload and process for a response.
:param payload: The message contents received.
:return: True if successfull, else False.
:return: True if successful, else False.
"""
self.sys_log.debug(f"Received payload: {payload}")
@@ -304,7 +315,6 @@ class Terminal(Service):
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS:
self.sys_log.info(f"Login Successful, connection ID is {payload.connection_uuid}")
self.connection_uuid = payload.connection_uuid
self.is_connected = True
return True
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
@@ -322,14 +332,15 @@ class Terminal(Service):
def _disconnect(self, dest_ip_address: IPv4Address) -> bool:
"""Disconnect from the remote."""
if not self.is_connected:
self.sys_log.warning("Not currently connected to remote")
return False
if not self.remote_connection:
if not self._connections:
self.sys_log.warning("No remote connection present")
return False
# TODO: This should probably be done entirely by connection uuid and not IP_address.
for connection in self._connections:
if dest_ip_address == self._connections[connection].dest_ip_address:
self._connections.pop(connection)
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(
payload={"type": "disconnect", "connection_id": self.connection_uuid},
@@ -347,7 +358,6 @@ class Terminal(Service):
:return: True if successful, False otherwise.
"""
self._disconnect(dest_ip_address=dest_ip_address)
self.is_connected = False
def send(
self,