From ebf6e7a90ebf8b713ac0ae3dc958896a80ed0bea Mon Sep 17 00:00:00 2001 From: Charlie Crane Date: Tue, 2 Jul 2024 16:47:39 +0100 Subject: [PATCH] #2711 - Added in remote_login and process_login methods. Minor updates to make pydantic happy. Starting to flesh out functionality of Terminal Service in more detail --- .../simulator/network/protocols/ssh.py | 6 +- .../system/services/terminal/terminal.py | 139 +++++++++--------- 2 files changed, 73 insertions(+), 72 deletions(-) diff --git a/src/primaite/simulator/network/protocols/ssh.py b/src/primaite/simulator/network/protocols/ssh.py index 448f0fec..7be81982 100644 --- a/src/primaite/simulator/network/protocols/ssh.py +++ b/src/primaite/simulator/network/protocols/ssh.py @@ -60,11 +60,11 @@ class SSHConnectionMessage(IntEnum): class SSHPacket(DataPacket): """Represents an SSHPacket.""" - transport_message: SSHTransportMessage + transport_message: SSHTransportMessage = None - connection_message: SSHConnectionMessage + connection_message: SSHConnectionMessage = None - ssh_command: Optional[any] = None # This is the request string + ssh_command: Optional[str] = None # This is the request string ssh_output: Optional[RequestResponse] = None # The Request Manager's returned RequestResponse diff --git a/src/primaite/simulator/system/services/terminal/terminal.py b/src/primaite/simulator/system/services/terminal/terminal.py index d86d21c6..e1964f78 100644 --- a/src/primaite/simulator/system/services/terminal/terminal.py +++ b/src/primaite/simulator/system/services/terminal/terminal.py @@ -8,8 +8,7 @@ from uuid import uuid4 from primaite.interface.request import RequestFormat, RequestResponse from primaite.simulator.core import RequestManager, RequestPermissionValidator from primaite.simulator.network.protocols.icmp import ICMPPacket - -# from primaite.simulator.network.protocols.ssh import SSHPacket, SSHTransportMessage, SSHConnectionMessage +from primaite.simulator.network.protocols.ssh import SSHConnectionMessage, SSHPacket, SSHTransportMessage from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.services.service import Service, ServiceOperatingState @@ -21,19 +20,22 @@ class Terminal(Service): user_account: Optional[str] = None "The User Account used for login" - connected: bool = False + is_connected: bool = False "Boolean Value for whether connected" connection_uuid: Optional[str] = None "Uuid for connection requests" + operating_state: ServiceOperatingState = ServiceOperatingState.INSTALLING + """Service Operating State""" # Install at start ??? Maybe ??? + def __init__(self, **kwargs): kwargs["name"] = "Terminal" kwargs["port"] = Port.SSH kwargs["protocol"] = IPProtocol.TCP super().__init__(**kwargs) - self.operating_state = ServiceOperatingState.RUNNING + # self.operating_state = ServiceOperatingState.RUNNING class _LoginValidator(RequestPermissionValidator): """ @@ -46,34 +48,22 @@ class Terminal(Service): def __call__(self, request: RequestFormat, context: Dict) -> bool: """Return whether the login credentials are valid.""" - pass + # TODO: Expand & Implement logic when we have User Accounts. + if self.terminal.is_connected: + return True + else: + self.terminal.sys_log.error("terminal is not logged in.") @property def fail_message(self) -> str: """Message that is reported when a request is rejected by this validator.""" - return ( - f"Cannot perform request on Terminal '{self.terminal.hostname}' because login credentials are invalid" - ) + return f"Cannot perform request on Terminal '{self.terminal.name}' because login credentials are invalid" def _validate_login(self) -> bool: """Validate login credentials when receiving commands.""" # TODO: Implement return True - def receive_payload_from_software_manager( - self, - payload: Any, - dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, - src_port: Optional[Port] = None, - dst_port: Optional[Port] = None, - session_id: Optional[str] = None, - ip_protocol: IPProtocol = IPProtocol.TCP, - icmp_packet: Optional[ICMPPacket] = None, - connection_id: Optional[str] = None, - ) -> Union[Any, None]: - """Receive Software Manager Payload.""" - self._validate_login() - def _init_request_manager(self) -> RequestManager: """Initialise Request manager.""" # _login_is_valid = Terminal._LoginValidator(terminal=self) @@ -101,7 +91,7 @@ class Terminal(Service): """ state = super().describe_state() # TBD - state.update({"hostname": self.hostname}) + state.update({"hostname": self.name}) return state def execute(self, command: Any, request: Any) -> Optional[RequestResponse]: @@ -133,58 +123,69 @@ class Terminal(Service): # %% - # def _ssh_process_login(self, user_account: dict, **kwargs) -> SSHPacket: - # """Processes the login attempt. Returns a SSHPacket which either rejects the login or accepts it.""" - # # we assume that the login fails unless we meet all the criteria. - # transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE - # connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED - # # operating state validation here(if overwhelmed) + def _ssh_process_login(self, user_account: dict, **kwargs) -> SSHPacket: + """Processes the login attempt. Returns a SSHPacket which either rejects the login or accepts it.""" + # we assume that the login fails unless we meet all the criteria. + transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE + connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED + # operating state validation here(if overwhelmed) - # # Hard coded at current - replace with another method to handle local accounts. - # if user_account == f"{self.user_name:} placeholder, {self.password:} placeholder": # hardcoded - # connection_id = self._generate_connection_id() - # if not self.add_connection(self, connection_id="ssh_connection", session_id=self.session_id): - # self.sys_log.warning(f"{self.name}: Connect request for {self.src_ip} declined. - # Service is at capacity.") - # ... - # else: - # self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised") - # transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS - # connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_CONFIRMATION + # Hard coded at current - replace with another method to handle local accounts. + if user_account == f"{self.user_name:} placeholder, {self.password:} placeholder": # hardcoded + connection_id = self._generate_connection_id() + if not self.add_connection(self, connection_id="ssh_connection", session_id=self.session_id): + self.sys_log.warning( + f"{self.name}: Connect request for {self.src_ip} declined. Service is at capacity." + ) + else: + self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised") + transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS + connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_CONFIRMATION + self.is_connected = True - # payload: SSHPacket = SSHPacket(transport_message = transport_message, connection_message = connection_message) - # return payload + payload: SSHPacket = SSHPacket(transport_message=transport_message, connection_message=connection_message) + return payload # %% # Copy + Paste from Terminal Wiki - # def ssh_remote_login(self, dest_ip_address = IPv4Address, user_account: Optional[dict] = None) -> bool: - # if user_account: - # # Setting default creds (Best to use this until we have more clarification on the specifics of user accounts) - # self.user_account = {self.user_name:"placeholder", self.password:"placeholder"} + def ssh_remote_login(self, dest_ip_address: IPv4Address, user_account: Optional[dict] = None) -> bool: + """Remote login to terminal via SSH.""" + if user_account: + # Setting default creds (Best to use this until we have more clarification around user accounts) + self.user_account = {self.user_name: "placeholder", self.password: "placeholder"} + + # Implement SSHPacket class + payload: SSHPacket = SSHPacket( + transport_message=SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST, + connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN, + user_account=user_account, + ) + if self.send(payload=payload, dest_ip_address=dest_ip_address): + if payload.connection_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS: + self.sys_log.info(f"{self.name} established an ssh connection with {dest_ip_address}") + # Need to confirm if self.uuid is correct. + self.add_connection(self, connection_id=self.uuid, session_id=self.session_id) + return True + else: + self.sys_log.error("Payload type incorrect, Login Failed") + return False + else: + self.sys_log.error("Incorrect credentials provided. Login Failed.") + return False - # # Implement SSHPacket class - # payload: SSHPacket = SSHPacket(transport_message= SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST, - # connection_message= SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN, - # user_account=user_account) - # if self.send(payload=payload,dest_ip_address=dest_ip_address): - # if payload.connection_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS: - # self.sys_log.info(f"{self.name} established an ssh connection with {dest_ip_address}") - # # Need to confirm if self.uuid is correct. - # self.add_connection(self, connection_id=self.uuid, session_id=self.session_id) - # return True - # else: - # self.sys_log.error("Payload type incorrect, Login Failed") - # return False - # else: - # self.sys_log.error("Incorrect credentials provided. Login Failed.") - # return False # %% - def connect(self, **kwargs): - """Send connect request.""" - self._connect(self, **kwargs) - - def _connect(self): - """Do something.""" - pass + def receive_payload_from_software_manager( + self, + payload: Any, + dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, + src_port: Optional[Port] = None, + dst_port: Optional[Port] = None, + session_id: Optional[str] = None, + ip_protocol: IPProtocol = IPProtocol.TCP, + icmp_packet: Optional[ICMPPacket] = None, + connection_id: Optional[str] = None, + ) -> Union[Any, None]: + """Receive Software Manager Payload.""" + self._validate_login()