#2706 - Actioning Review Comments

This commit is contained in:
Charlie Crane
2024-08-06 14:10:10 +01:00
parent 68621f172b
commit df49b3b5bb

View File

@@ -1,11 +1,11 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
from datetime import datetime
from ipaddress import IPv4Address
from typing import Any, Dict, List, Optional, Union
from uuid import uuid4
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel
from primaite.interface.request import RequestFormat, RequestResponse
@@ -41,6 +41,21 @@ class TerminalClientConnection(BaseModel):
connection_request_id: str = None
"""Connection request ID"""
time: datetime = None
"""Timestammp connection was created."""
ip_address: IPv4Address
"""Source IP of Connection"""
def __str__(self) -> str:
return f"{self.__class__.__name__}(connection_id='{self.connection_uuid}')"
def __repr__(self) -> str:
return self.__str__()
def __getitem__(self, key: Any) -> Any:
return getattr(self, key)
@property
def client(self) -> Optional[Terminal]:
"""The Terminal that holds this connection."""
@@ -59,9 +74,6 @@ class RemoteTerminalConnection(TerminalClientConnection):
"""
source_ip: IPv4Address
"""Source IP of Connection"""
def execute(self, command: Any) -> bool:
"""Execute a given command on the remote Terminal."""
if self.parent_terminal.operating_state != ServiceOperatingState.RUNNING:
@@ -73,7 +85,7 @@ class RemoteTerminalConnection(TerminalClientConnection):
class Terminal(Service):
"""Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH."""
_client_connection_requests: Dict[str, Optional[str]] = {}
_client_connection_requests: Dict[str, Optional[Union[str, TerminalClientConnection]]] = {}
def __init__(self, **kwargs):
kwargs["name"] = "Terminal"
@@ -99,14 +111,7 @@ class Terminal(Service):
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["Connection ID", "Connection request ID", "Source IP"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} {self.name} Connections"
for connection_id, connection in self._connections.items():
table.add_row([connection_id, connection.connection_request_id, connection.source_ip])
print(table.get_string(sortby="Connection ID"))
self.show_connections(markdown=markdown)
def _init_request_manager(self) -> RequestManager:
"""Initialise Request manager."""
@@ -179,6 +184,7 @@ class Terminal(Service):
parent_terminal=self,
connection_uuid=connection_uuid,
session_id=session_id,
time=datetime.now(),
)
self._connections[connection_uuid] = new_connection
self._client_connection_requests[connection_uuid] = new_connection
@@ -224,19 +230,20 @@ class Terminal(Service):
connection_uuid = str(uuid4())
if connection_uuid:
self.sys_log.info(f"Login request authorised, connection uuid: {connection_uuid}")
# Add new local session to list of connections
self._create_local_connection(connection_uuid=connection_uuid, session_id="Local_Connection")
return TerminalClientConnection(
parent_terminal=self, session_id="Local_Connection", connection_uuid=connection_uuid
)
# Add new local session to list of connections and return
return self._create_local_connection(connection_uuid=connection_uuid, session_id="Local_Connection")
else:
self.sys_log.warning("Login failed, incorrect Username or Password")
return None
def _check_client_connection(self, connection_id: str) -> bool:
def _validate_client_connection_request(self, connection_id: str) -> bool:
"""Check that client_connection_id is valid."""
return True if connection_id in self._client_connection_requests else False
def _check_client_connection(self, connection_id: str) -> bool:
"""Check that client_connection_id is valid."""
return True if connection_id in self._connections else False
def _send_remote_login(
self,
username: str,
@@ -267,11 +274,15 @@ class Terminal(Service):
"""
self.sys_log.info(f"Sending Remote login attempt to {ip_address}. Connection_id is {connection_request_id}")
if is_reattempt:
valid_connection = self._check_client_connection(connection_id=connection_request_id)
if valid_connection:
valid_connection_request = self._validate_client_connection_request(connection_id=connection_request_id)
if valid_connection_request:
remote_terminal_connection = self._client_connection_requests.pop(connection_request_id)
self.sys_log.info(f"{self.name}: Remote Connection to {ip_address} authorised.")
return remote_terminal_connection
if isinstance(remote_terminal_connection, RemoteTerminalConnection):
self.sys_log.info(f"{self.name}: Remote Connection to {ip_address} authorised.")
return remote_terminal_connection
else:
self.sys_log.warning(f"Connection request{connection_request_id} declined")
return None
else:
self.sys_log.warning(f"{self.name}: Remote connection to {ip_address} declined.")
return None
@@ -322,8 +333,9 @@ class Terminal(Service):
parent_terminal=self,
session_id=session_id,
connection_uuid=connection_id,
source_ip=source_ip,
ip_address=source_ip,
connection_request_id=connection_request_id,
time=datetime.now(),
)
self._connections[connection_id] = client_connection
self._client_connection_requests[connection_request_id] = client_connection
@@ -391,8 +403,12 @@ class Terminal(Service):
if isinstance(payload, dict) and payload.get("type"):
if payload["type"] == "disconnect":
connection_id = payload["connection_id"]
self.sys_log.info(f"{self.name}: Received disconnect command for {connection_id=} from remote.")
self._disconnect(payload["connection_id"])
valid_id = self._check_client_connection(connection_id)
if valid_id:
self.sys_log.info(f"{self.name}: Received disconnect command for {connection_id=} from remote.")
self._disconnect(payload["connection_id"])
else:
self.sys_log.info("No Active connection held for received connection ID.")
if isinstance(payload, list):
# A request? For me?
@@ -410,8 +426,9 @@ class Terminal(Service):
self.sys_log.warning("No remote connection present")
return False
session_id = self._connections[connection_uuid].session_id
self._connections.pop(connection_uuid)
# session_id = self._connections[connection_uuid].session_id
connection: RemoteTerminalConnection = self._connections.pop(connection_uuid)
session_id = connection.session_id
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(