#2710 - commit before changing branch

This commit is contained in:
Charlie Crane
2024-07-09 11:55:16 +01:00
parent 42602be953
commit 8061102587

View File

@@ -27,7 +27,7 @@ class TerminalClientConnection(BaseModel):
connection_id: str
"""Connection UUID."""
parent_node: Node # Technically I think this should be HostNode, but that causes a circular import.
parent_node: Node # Technically I think this should be HostNode, but that causes a circular import.
"""The parent Node that this connection was created on."""
is_active: bool = True
@@ -145,13 +145,12 @@ class Terminal(Service):
def validate_user(self, user: Dict[str]) -> bool:
return True if user.get("username") in self.user_connections else False
def _ssh_process_logoff(self, dest_ip_address: IPv4Address, user_account: dict, **kwargs) -> bool:
"""Process the logoff attempt. Return a bool if succesful or unsuccessful."""
if self.validate_user(user_account):
# Account is logged in
self.user_connections.pop[user_account["username"]] # assumption atm
self.user_connections.pop[user_account["username"]] # assumption atm
self.is_connected = False
return True
else:
@@ -164,33 +163,36 @@ class Terminal(Service):
"""Send confirmation of successful disconnect"""
transport_message = SSHTransportMessage.SSH_MSG_SERVICE_SUCCESS
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_CLOSE
payload: SSHPacket = SSHPacket(transport_message=transport_message, connection_message=connection_message, ssh_output=RequestResponse(status="success"))
payload: SSHPacket = SSHPacket(
transport_message=transport_message,
connection_message=connection_message,
ssh_output=RequestResponse(status="success"),
)
self.send(payload=payload)
def receive(self, payload: SSHPacket, session_id: str, **kwargs) -> bool:
self.sys_log.debug(f"Received payload: {payload} from session: {session_id}")
if payload.connection_message ==SSHConnectionMessage. SSH_MSG_CHANNEL_CLOSE:
result = self._ssh_process_logoff(session_id=session_id)
# We need to close on the other machine as well
self.send_logoff_ack()
self.sys_log.debug(f"Received payload: {payload} from session: {session_id}")
if payload.connection_message == SSHConnectionMessage.SSH_MSG_CHANNEL_CLOSE:
result = self._ssh_process_logoff(session_id=session_id)
# We need to close on the other machine as well
self.send_logoff_ack()
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST:
src_ip = kwargs.get("frame").ip.src_ip_address
user_account = payload.get("user_account", {})
result = self._ssh_process_login(src_ip=src_ip, session_id=session_id, user_account=user_account)
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST:
src_ip = kwargs.get("frame").ip.src_ip_address
user_account = payload.get("user_account", {})
result = self._ssh_process_login(src_ip=src_ip, session_id=session_id, user_account=user_account)
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
# Ensure we only ever process requests if we have a established connection (e.g session_id is provided and validated)
result = self._ssh_process_command(session_id=session_id)
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
# Ensure we only ever process requests if we have a established connection (e.g session_id is provided and validated)
result = self._ssh_process_command(session_id=session_id)
else:
self.sys_log.warning("Encounter unexpected message type, rejecting connection")
# send a SSH_MSG_CHANNEL_CLOSE if there is a session_id otherwise SSH_MSG_OPEN_FAILED
return False
self.send(payload=result, session_id=session_id)
return True
else:
self.sys_log.warning("Encounter unexpected message type, rejecting connection")
# send a SSH_MSG_CHANNEL_CLOSE if there is a session_id otherwise SSH_MSG_OPEN_FAILED
return False
self.send(payload=result, session_id=session_id)
return True
# %% Outbound
@@ -268,12 +270,10 @@ class Terminal(Service):
self.connected = False
return True
def send(
self,
payload: SSHPacket,
dest_ip_address: Optional[IPv4Address] = None,
session_id: Optional[str] = None,
) -> bool:
return super().send(payload=payload, dest_ip_address=dest_ip_address, dest_port=Port.SSH, session_id=session_id)
self,
payload: SSHPacket,
dest_ip_address: Optional[IPv4Address] = None,
session_id: Optional[str] = None,
) -> bool:
return super().send(payload=payload, dest_ip_address=dest_ip_address, dest_port=Port.SSH, session_id=session_id)