#2706 - Removed source and target ip_address attributes from the SSHPacket Class. Terminal now uses session_id to send login outcome. No more network_interface[1].ip_address.

This commit is contained in:
Charlie Crane
2024-08-02 09:13:31 +01:00
parent 19d7774440
commit 0fe61576c7
2 changed files with 53 additions and 66 deletions

View File

@@ -1,7 +1,6 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from enum import IntEnum
from ipaddress import IPv4Address
from typing import Optional
from primaite.interface.request import RequestResponse
@@ -68,12 +67,6 @@ class SSHUserCredentials(DataPacket):
class SSHPacket(DataPacket):
"""Represents an SSHPacket."""
sender_ip_address: IPv4Address
"""Sender IP Address"""
target_ip_address: IPv4Address
"""Target IP Address"""
transport_message: SSHTransportMessage
"""Message Transport Type"""

View File

@@ -28,32 +28,21 @@ class TerminalClientConnection(BaseModel):
"""
TerminalClientConnection Class.
This class is used to record current remote User Connections to the Terminal class.
This class is used to record current User Connections to the Terminal class.
"""
parent_node: Node # Technically should be HostNode but this causes circular import error.
"""The parent Node that this connection was created on."""
is_active: bool = True
"""Flag to state whether the connection is still active or not."""
dest_ip_address: IPv4Address = None
"""Destination IP address of connection"""
session_id: str = None
"""Session ID that connection is linked to"""
_connection_uuid: str = None
"""Connection UUID"""
@property
def is_local(self) -> bool:
"""Indicates if connection is remote or local.
Returns True if local, False if remote.
"""
for interface in self.parent_node.network_interface:
if self.dest_ip_address == self.parent_node.network_interface[interface].ip_address:
return True
return False
@property
def client(self) -> Optional[Terminal]:
"""The Terminal that holds this connection."""
@@ -68,9 +57,6 @@ class TerminalClientConnection(BaseModel):
class Terminal(Service):
"""Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH."""
connection_uuid: Optional[str] = None
"Uuid for connection requests"
operating_state: ServiceOperatingState = ServiceOperatingState.RUNNING
"Initial Operating State"
@@ -104,13 +90,13 @@ class Terminal(Service):
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["Connection ID", "IP_Address", "Active", "Local"])
table = PrettyTable(["Connection ID", "Session_ID"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} {self.name} Connections"
for connection_id, connection in self._connections.items():
table.add_row([connection_id, connection.dest_ip_address, connection.is_active, connection.is_local])
table.add_row([connection_id, connection.session_id])
print(table.get_string(sortby="Connection ID"))
def _init_request_manager(self) -> RequestManager:
@@ -145,11 +131,12 @@ class Terminal(Service):
self.execute(command)
return RequestResponse(status="success", data={})
def _logoff() -> RequestResponse:
def _logoff(request: List[Any]) -> RequestResponse:
"""Logoff from connection."""
connection_uuid = request[0]
# TODO: Uncomment this when UserSessionManager merged.
# self.parent.UserSessionManager.logoff(self.connection_uuid)
self.disconnect(self.connection_uuid)
# self.parent.UserSessionManager.logoff(connection_uuid)
self.disconnect(connection_uuid)
return RequestResponse(status="success", data={})
@@ -191,12 +178,12 @@ class Terminal(Service):
"""Message that is reported when a request is rejected by this validator."""
return "Cannot perform request on terminal as not logged in."
def _add_new_connection(self, connection_uuid: str, dest_ip_address: IPv4Address):
def _add_new_connection(self, connection_uuid: str, session_id: str):
"""Create a new connection object and amend to list of active connections."""
self._connections[connection_uuid] = TerminalClientConnection(
parent_node=self.software_manager.node,
dest_ip_address=dest_ip_address,
connection_uuid=connection_uuid,
session_id=session_id,
)
def login(self, username: str, password: str, ip_address: Optional[IPv4Address] = None) -> bool:
@@ -219,23 +206,31 @@ class Terminal(Service):
return self._process_local_login(username=username, password=password)
def _process_local_login(self, username: str, password: str) -> bool:
"""Local session login to terminal."""
"""Local session login to terminal.
:param username: Username for login.
:param password: Password for login.
:return: boolean, True if successful, else False
"""
# TODO: Un-comment this when UserSessionManager is merged.
# connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
connection_uuid = str(uuid4())
if connection_uuid:
self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}")
self.sys_log.info(f"Login request authorised, connection uuid: {connection_uuid}")
# Add new local session to list of connections
self._add_new_connection(
connection_uuid=connection_uuid, dest_ip_address=self.parent.network_interface[1].ip_address
)
self._add_new_connection(connection_uuid=connection_uuid)
return True
else:
self.sys_log.warning("Login failed, incorrect Username or Password")
return False
def _send_remote_login(self, username: str, password: str, ip_address: IPv4Address) -> bool:
"""Attempt to login to a remote terminal."""
"""Attempt to login to a remote terminal.
:param username: username for login.
:param password: password for login.
:ip_address: IP address of the target node for login.
"""
transport_message: SSHTransportMessage = SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST
connection_message: SSHConnectionMessage = SSHConnectionMessage.SSH_MSG_CHANNEL_DATA
user_account: SSHUserCredentials = SSHUserCredentials(username=username, password=password)
@@ -244,14 +239,12 @@ class Terminal(Service):
transport_message=transport_message,
connection_message=connection_message,
user_account=user_account,
target_ip_address=ip_address,
sender_ip_address=self.parent.network_interface[1].ip_address,
)
self.sys_log.info(f"Sending remote login request to {ip_address}")
return self.send(payload=payload, dest_ip_address=ip_address)
def _process_remote_login(self, payload: SSHPacket) -> bool:
def _process_remote_login(self, payload: SSHPacket, session_id: str) -> bool:
"""Processes a remote terminal requesting to login to this terminal.
:param payload: The SSH Payload Packet.
@@ -266,8 +259,7 @@ class Terminal(Service):
if connection_uuid:
# Send uuid to remote
self.sys_log.info(
f"Remote login authorised, connection ID {connection_uuid} for "
f"{username} on {payload.sender_ip_address}"
f"Remote login authorised, connection ID {connection_uuid} for " f"{username} in session {session_id}"
)
transport_message: SSHTransportMessage = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS
connection_message: SSHConnectionMessage = SSHConnectionMessage.SSH_MSG_CHANNEL_DATA
@@ -275,19 +267,17 @@ class Terminal(Service):
transport_message=transport_message,
connection_message=connection_message,
connection_uuid=connection_uuid,
sender_ip_address=self.parent.network_interface[1].ip_address,
target_ip_address=payload.sender_ip_address,
)
self._add_new_connection(connection_uuid=connection_uuid, dest_ip_address=payload.sender_ip_address)
self._add_new_connection(connection_uuid=connection_uuid, session_id=session_id)
self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address)
self.send(payload=return_payload, session_id=session_id)
return True
else:
# UserSessionManager has returned None
self.sys_log.warning("Login failed, incorrect Username or Password")
return False
def receive(self, payload: SSHPacket, **kwargs) -> bool:
def receive(self, payload: SSHPacket, session_id: str, **kwargs) -> bool:
"""Receive Payload and process for a response.
:param payload: The message contents received.
@@ -310,11 +300,10 @@ class Terminal(Service):
self.sys_log.debug(f"Disconnecting {connection_id}")
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST:
return self._process_remote_login(payload=payload)
return self._process_remote_login(payload=payload, session_id=session_id)
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS:
self.sys_log.info(f"Login Successful, connection ID is {payload.connection_uuid}")
self.connection_uuid = payload.connection_uuid
return True
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
@@ -330,16 +319,18 @@ class Terminal(Service):
"""Execute a passed ssh command via the request manager."""
return self.parent.apply_request(command)
def _disconnect(self, dest_ip_address: IPv4Address) -> bool:
"""Disconnect from the remote."""
def _disconnect(self, connection_uuid: str) -> bool:
"""Disconnect from the remote.
:param connection_uuid: Connection ID that we want to disconnect.
:return True if successful, False otherwise.
"""
if not self._connections:
self.sys_log.warning("No remote connection present")
return False
# TODO: This should probably be done entirely by connection uuid and not IP_address.
for connection in self._connections:
if dest_ip_address == self._connections[connection].dest_ip_address:
self._connections.pop(connection)
dest_ip_address = self._connections[connection_uuid].dest_ip_address
self._connections.pop(connection_uuid)
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(
@@ -347,22 +338,23 @@ class Terminal(Service):
dest_ip_address=dest_ip_address,
dest_port=self.port,
)
self.connection_uuid = None
self.sys_log.info(f"{self.name}: Disconnected {self.connection_uuid}")
self.sys_log.info(f"{self.name}: Disconnected {connection_uuid}")
return True
def disconnect(self, dest_ip_address: IPv4Address) -> bool:
"""Disconnect from remote connection.
def disconnect(self, connection_uuid: Optional[str]) -> bool:
"""Disconnect the terminal.
:param dest_ip_address: The IP address fo the connection we are terminating.
If no connection id has been supplied, disconnects the first connection.
:param connection_uuid: Connection ID that we want to disconnect.
:return: True if successful, False otherwise.
"""
self._disconnect(dest_ip_address=dest_ip_address)
if not connection_uuid:
connection_uuid = next(iter(self._connections))
return self._disconnect(connection_uuid=connection_uuid)
def send(
self,
payload: SSHPacket,
dest_ip_address: IPv4Address,
self, payload: SSHPacket, dest_ip_address: Optional[IPv4Address] = None, session_id: Optional[str] = None
) -> bool:
"""
Send a payload out from the Terminal.
@@ -374,4 +366,6 @@ class Terminal(Service):
self.sys_log.warning(f"Cannot send commands when Operating state is {self.operating_state}!")
return False
self.sys_log.debug(f"Sending payload: {payload}")
return super().send(payload=payload, dest_ip_address=dest_ip_address, dest_port=self.port)
return super().send(
payload=payload, dest_ip_address=dest_ip_address, dest_port=self.port, session_id=session_id
)