diff --git a/src/primaite/simulator/system/services/terminal/terminal.py b/src/primaite/simulator/system/services/terminal/terminal.py index 0c94a565..0bcec90d 100644 --- a/src/primaite/simulator/system/services/terminal/terminal.py +++ b/src/primaite/simulator/system/services/terminal/terminal.py @@ -1,11 +1,11 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK from __future__ import annotations +from datetime import datetime from ipaddress import IPv4Address from typing import Any, Dict, List, Optional, Union from uuid import uuid4 -from prettytable import MARKDOWN, PrettyTable from pydantic import BaseModel from primaite.interface.request import RequestFormat, RequestResponse @@ -41,6 +41,21 @@ class TerminalClientConnection(BaseModel): connection_request_id: str = None """Connection request ID""" + time: datetime = None + """Timestammp connection was created.""" + + ip_address: IPv4Address + """Source IP of Connection""" + + def __str__(self) -> str: + return f"{self.__class__.__name__}(connection_id='{self.connection_uuid}')" + + def __repr__(self) -> str: + return self.__str__() + + def __getitem__(self, key: Any) -> Any: + return getattr(self, key) + @property def client(self) -> Optional[Terminal]: """The Terminal that holds this connection.""" @@ -59,9 +74,6 @@ class RemoteTerminalConnection(TerminalClientConnection): """ - source_ip: IPv4Address - """Source IP of Connection""" - def execute(self, command: Any) -> bool: """Execute a given command on the remote Terminal.""" if self.parent_terminal.operating_state != ServiceOperatingState.RUNNING: @@ -73,7 +85,7 @@ class RemoteTerminalConnection(TerminalClientConnection): class Terminal(Service): """Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH.""" - _client_connection_requests: Dict[str, Optional[str]] = {} + _client_connection_requests: Dict[str, Optional[Union[str, TerminalClientConnection]]] = {} def __init__(self, **kwargs): kwargs["name"] = "Terminal" @@ -99,14 +111,7 @@ class Terminal(Service): :param markdown: Whether to display the table in Markdown format or not. Default is `False`. """ - table = PrettyTable(["Connection ID", "Connection request ID", "Source IP"]) - if markdown: - table.set_style(MARKDOWN) - table.align = "l" - table.title = f"{self.sys_log.hostname} {self.name} Connections" - for connection_id, connection in self._connections.items(): - table.add_row([connection_id, connection.connection_request_id, connection.source_ip]) - print(table.get_string(sortby="Connection ID")) + self.show_connections(markdown=markdown) def _init_request_manager(self) -> RequestManager: """Initialise Request manager.""" @@ -179,6 +184,7 @@ class Terminal(Service): parent_terminal=self, connection_uuid=connection_uuid, session_id=session_id, + time=datetime.now(), ) self._connections[connection_uuid] = new_connection self._client_connection_requests[connection_uuid] = new_connection @@ -224,19 +230,20 @@ class Terminal(Service): connection_uuid = str(uuid4()) if connection_uuid: self.sys_log.info(f"Login request authorised, connection uuid: {connection_uuid}") - # Add new local session to list of connections - self._create_local_connection(connection_uuid=connection_uuid, session_id="Local_Connection") - return TerminalClientConnection( - parent_terminal=self, session_id="Local_Connection", connection_uuid=connection_uuid - ) + # Add new local session to list of connections and return + return self._create_local_connection(connection_uuid=connection_uuid, session_id="Local_Connection") else: self.sys_log.warning("Login failed, incorrect Username or Password") return None - def _check_client_connection(self, connection_id: str) -> bool: + def _validate_client_connection_request(self, connection_id: str) -> bool: """Check that client_connection_id is valid.""" return True if connection_id in self._client_connection_requests else False + def _check_client_connection(self, connection_id: str) -> bool: + """Check that client_connection_id is valid.""" + return True if connection_id in self._connections else False + def _send_remote_login( self, username: str, @@ -267,11 +274,15 @@ class Terminal(Service): """ self.sys_log.info(f"Sending Remote login attempt to {ip_address}. Connection_id is {connection_request_id}") if is_reattempt: - valid_connection = self._check_client_connection(connection_id=connection_request_id) - if valid_connection: + valid_connection_request = self._validate_client_connection_request(connection_id=connection_request_id) + if valid_connection_request: remote_terminal_connection = self._client_connection_requests.pop(connection_request_id) - self.sys_log.info(f"{self.name}: Remote Connection to {ip_address} authorised.") - return remote_terminal_connection + if isinstance(remote_terminal_connection, RemoteTerminalConnection): + self.sys_log.info(f"{self.name}: Remote Connection to {ip_address} authorised.") + return remote_terminal_connection + else: + self.sys_log.warning(f"Connection request{connection_request_id} declined") + return None else: self.sys_log.warning(f"{self.name}: Remote connection to {ip_address} declined.") return None @@ -322,8 +333,9 @@ class Terminal(Service): parent_terminal=self, session_id=session_id, connection_uuid=connection_id, - source_ip=source_ip, + ip_address=source_ip, connection_request_id=connection_request_id, + time=datetime.now(), ) self._connections[connection_id] = client_connection self._client_connection_requests[connection_request_id] = client_connection @@ -391,8 +403,12 @@ class Terminal(Service): if isinstance(payload, dict) and payload.get("type"): if payload["type"] == "disconnect": connection_id = payload["connection_id"] - self.sys_log.info(f"{self.name}: Received disconnect command for {connection_id=} from remote.") - self._disconnect(payload["connection_id"]) + valid_id = self._check_client_connection(connection_id) + if valid_id: + self.sys_log.info(f"{self.name}: Received disconnect command for {connection_id=} from remote.") + self._disconnect(payload["connection_id"]) + else: + self.sys_log.info("No Active connection held for received connection ID.") if isinstance(payload, list): # A request? For me? @@ -410,8 +426,9 @@ class Terminal(Service): self.sys_log.warning("No remote connection present") return False - session_id = self._connections[connection_uuid].session_id - self._connections.pop(connection_uuid) + # session_id = self._connections[connection_uuid].session_id + connection: RemoteTerminalConnection = self._connections.pop(connection_uuid) + session_id = connection.session_id software_manager: SoftwareManager = self.software_manager software_manager.send_payload_to_session_manager(