diff --git a/src/primaite/simulator/system/services/terminal/terminal.py b/src/primaite/simulator/system/services/terminal/terminal.py index 9dd40edc..039fbeb3 100644 --- a/src/primaite/simulator/system/services/terminal/terminal.py +++ b/src/primaite/simulator/system/services/terminal/terminal.py @@ -17,6 +17,8 @@ from primaite.simulator.system.core.software_manager import SoftwareManager from primaite.simulator.system.services.service import Service, ServiceOperatingState + +# TODO: This might not be needed now? class TerminalClientConnection(BaseModel): """ TerminalClientConnection Class. @@ -52,9 +54,6 @@ class TerminalClientConnection(BaseModel): class Terminal(Service): """Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH.""" - user_account: Optional[str] = None - "The User Account used for login" - is_connected: bool = False "Boolean Value for whether connected" @@ -64,8 +63,6 @@ class Terminal(Service): operating_state: ServiceOperatingState = ServiceOperatingState.RUNNING """Initial Operating State""" - user_connections: Dict[str, TerminalClientConnection] = {} - """List of authenticated connected users""" def __init__(self, **kwargs): kwargs["name"] = "Terminal" @@ -85,38 +82,24 @@ class Terminal(Service): :rtype: Dict """ state = super().describe_state() - - state.update({"hostname": self.name}) return state def apply_request(self, request: List[str | int | float | Dict], context: Dict | None = None) -> RequestResponse: - """Apply Temrinal Request.""" + """Apply Terminal Request.""" return super().apply_request(request, context) def _init_request_manager(self) -> RequestManager: """Initialise Request manager.""" - # TODO: Expand with a login validator? - _login_valid = Terminal._LoginValidator(terminal=self) rm = super()._init_request_manager() - rm.add_request( - "login", - request_type=RequestType( - func=lambda request, context: RequestResponse.from_bool(self._validate_login()), validator=_login_valid - ), - ) + rm.add_request("login", request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self._validate_login()), validator=_login_valid)) return rm - def _validate_login(self, user_account: Optional[str]) -> bool: + def _validate_login(self, connection_id: str) -> bool: """Validate login credentials are valid.""" - # TODO: Interact with UserManager to check user_account details - if len(self.user_connections) == 0: - # No current connections - self.sys_log.warning("Login Required!") - return False - else: - return True + return self.parent.UserSessionManager.validate_remote_session_uuid(connection_id) + class _LoginValidator(RequestPermissionValidator): """ @@ -132,77 +115,64 @@ class Terminal(Service): def __call__(self, request: RequestFormat, context: Dict) -> bool: """Return whether the Terminal has valid login credentials""" return self.terminal.login_status - + @property def fail_message(self) -> str: """Message that is reported when a request is rejected by this validator""" - return "Cannot perform request on terminal as not logged in." + return ("Cannot perform request on terminal as not logged in.") + # %% Inbound - def _generate_connection_uuid(self) -> str: - """Generate a unique connection ID.""" - # This might not be needed given user_manager.login() returns a UUID. - return str(uuid4()) - - def login(self, dest_ip_address: IPv4Address, **kwargs) -> bool: + def login(self, username: str, password: str, ip_address: Optional[IPv4Address]=None) -> bool: """Process User request to login to Terminal. :param dest_ip_address: The IP address of the node we want to connect to. + :param username: The username credential. + :param password: The user password component of credentials. :return: True if successful, False otherwise. """ if self.operating_state != ServiceOperatingState.RUNNING: self.sys_log.warning("Cannot process login as service is not running") return False - if self.connection_uuid in self.user_connections: - self.sys_log.debug("User authentication passed") + + # need to determine if this is a local or remote login + + if ip_address: + # ip_address has been given for remote login + return self._send_remote_login(username=username, password=password, ip_address=ip_address) + + return self._process_local_login(username=username, password=password) + + + def _process_local_login(self, username: str, password: str) -> bool: + """Local session login to terminal.""" + self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password) + if self.connection_uuid: + self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}") return True else: - # Need to send a login request - # TODO: Refactor with UserManager changes to provide correct credentials and validate. - transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST - connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN - payload: SSHPacket = SSHPacket( - payload="login", transport_message=transport_message, connection_message=connection_message - ) + self.sys_log.warning("Login failed, incorrect Username or Password") + return False - self.sys_log.info(f"Sending login request to {dest_ip_address}") - self.send(payload=payload, dest_ip_address=dest_ip_address) + def _send_remote_login(self, username: str, password: str, ip_address: IPv4Address) -> bool: + """Attempt to login to a remote terminal.""" + pass - def _ssh_process_login(self, dest_ip_address: IPv4Address, user_account: dict, **kwargs) -> bool: - """Processes the login attempt. Returns a bool which either rejects the login or accepts it.""" - # we assume that the login fails unless we meet all the criteria. - transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE - connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED - # Hard coded at current - replace with another method to handle local accounts. - if user_account == "Username: placeholder, Password: placeholder": # hardcoded - self.connection_uuid = self._generate_connection_uuid() - if not self.add_connection(connection_id=self.connection_uuid): - self.sys_log.warning( - f"{self.name}: Connect request for {dest_ip_address} declined. Service is at capacity." - ) - return False - else: - self.sys_log.info(f"{self.name}: Connect request for ID: {self.connection_uuid} authorised") - transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS - connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_CONFIRMATION - new_connection = TerminalClientConnection( - parent_node=self.software_manager.node, - connection_id=self.connection_uuid, - dest_ip_address=dest_ip_address, - ) - self.user_connections[self.connection_uuid] = new_connection - self.is_connected = True - payload: SSHPacket = SSHPacket(transport_message=transport_message, connection_message=connection_message) + def _process_remote_login(self, username: str, password: str, ip_address:IPv4Address) -> bool: + """Processes a remote terminal requesting to login to this terminal.""" + self.connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password) + if self.connection_uuid: + # Send uuid to remote + self.sys_log.info(f"Remote login authorised, connection ID {self.connection_uuid} for {username} on {ip_address}") + # send back to origin. + return True + else: + self.sys_log.warning("Login failed, incorrect Username or Password") + return False - self.send(payload=payload, dest_ip_address=dest_ip_address) - return True - - def _ssh_process_logoff(self, session_id: str, *args, **kwargs) -> bool: - """Process the logoff attempt. Return a bool if succesful or unsuccessful.""" - # TODO: Should remove def receive(self, payload: SSHPacket, session_id: str, **kwargs) -> bool: """Receive Payload and process for a response.""" @@ -213,12 +183,9 @@ class Terminal(Service): self.sys_log.warning("Cannot process message as not running") return False - self.sys_log.debug(f"Received payload: {payload} from session: {session_id}") - if payload.connection_message == SSHConnectionMessage.SSH_MSG_CHANNEL_CLOSE: connection_id = kwargs["connection_id"] dest_ip_address = kwargs["dest_ip_address"] - self._ssh_process_logoff(session_id=session_id) self.disconnect(dest_ip_address=dest_ip_address) self.sys_log.debug(f"Disconnecting {connection_id}") # We need to close on the other machine as well @@ -240,38 +207,6 @@ class Terminal(Service): return True # %% Outbound - def _ssh_remote_login(self, dest_ip_address: IPv4Address, user_account: Optional[dict] = None) -> bool: - """Remote login to terminal via SSH.""" - if not user_account: - # TODO: Generic hardcoded info, will need to be updated with UserManager. - user_account = "Username: placeholder, Password: placeholder" - # something like self.user_manager.get_user_details ? - - # Implement SSHPacket class - payload: SSHPacket = SSHPacket( - transport_message=SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST, - connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN, - user_account=user_account, - ) - if self.send(payload=payload, dest_ip_address=dest_ip_address): - if payload.connection_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS: - self.sys_log.info(f"{self.name} established an ssh connection with {dest_ip_address}") - # Need to confirm if self.uuid is correct. - self.add_connection(self, connection_id=self.uuid, session_id=self.session_id) - return True - else: - self.sys_log.error("Login Failed. Incorrect credentials provided.") - return False - else: - self.sys_log.error("Login Failed. Incorrect credentials provided.") - return False - - def check_connection(self, connection_id: str) -> bool: - """Check whether the connection is valid.""" - if self.is_connected: - return self.send(dest_ip_address=self.dest_ip_address, connection_id=connection_id) - else: - return False def disconnect(self, dest_ip_address: IPv4Address) -> bool: """Disconnect from remote connection. diff --git a/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py b/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py index 6b0365ce..673b11a3 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py @@ -80,7 +80,7 @@ def test_terminal_fail_when_closed(basic_network): terminal.operating_state = ServiceOperatingState.STOPPED - assert terminal.login(dest_ip_address="192.168.0.11") is False + assert terminal.login(ip_address="192.168.0.11") is False def test_terminal_disconnect(basic_network): @@ -91,7 +91,7 @@ def test_terminal_disconnect(basic_network): assert terminal.is_connected is False - terminal.login(dest_ip_address="192.168.0.11") + terminal.login(ip_address="192.168.0.11") assert terminal.is_connected is True @@ -108,7 +108,7 @@ def test_terminal_ignores_when_off(basic_network): computer_b: Computer = network.get_node_by_hostname("node_b") - terminal_a.login(dest_ip_address="192.168.0.11") # login to computer_b + terminal_a.login(ip_address="192.168.0.11") # login to computer_b assert terminal_a.is_connected is True