#2712 - Commit before changing branches

This commit is contained in:
Charlie Crane
2024-07-22 09:58:09 +01:00
parent 5c04f4fa4c
commit 3c590a8733
2 changed files with 47 additions and 112 deletions

View File

@@ -17,6 +17,8 @@ from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.service import Service, ServiceOperatingState
# TODO: This might not be needed now?
class TerminalClientConnection(BaseModel):
"""
TerminalClientConnection Class.
@@ -52,9 +54,6 @@ class TerminalClientConnection(BaseModel):
class Terminal(Service):
"""Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH."""
user_account: Optional[str] = None
"The User Account used for login"
is_connected: bool = False
"Boolean Value for whether connected"
@@ -64,8 +63,6 @@ class Terminal(Service):
operating_state: ServiceOperatingState = ServiceOperatingState.RUNNING
"""Initial Operating State"""
user_connections: Dict[str, TerminalClientConnection] = {}
"""List of authenticated connected users"""
def __init__(self, **kwargs):
kwargs["name"] = "Terminal"
@@ -85,38 +82,24 @@ class Terminal(Service):
:rtype: Dict
"""
state = super().describe_state()
state.update({"hostname": self.name})
return state
def apply_request(self, request: List[str | int | float | Dict], context: Dict | None = None) -> RequestResponse:
"""Apply Temrinal Request."""
"""Apply Terminal Request."""
return super().apply_request(request, context)
def _init_request_manager(self) -> RequestManager:
"""Initialise Request manager."""
# TODO: Expand with a login validator?
_login_valid = Terminal._LoginValidator(terminal=self)
rm = super()._init_request_manager()
rm.add_request(
"login",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(self._validate_login()), validator=_login_valid
),
)
rm.add_request("login", request_type=RequestType(func=lambda request, context: RequestResponse.from_bool(self._validate_login()), validator=_login_valid))
return rm
def _validate_login(self, user_account: Optional[str]) -> bool:
def _validate_login(self, connection_id: str) -> bool:
"""Validate login credentials are valid."""
# TODO: Interact with UserManager to check user_account details
if len(self.user_connections) == 0:
# No current connections
self.sys_log.warning("Login Required!")
return False
else:
return True
return self.parent.UserSessionManager.validate_remote_session_uuid(connection_id)
class _LoginValidator(RequestPermissionValidator):
"""
@@ -132,77 +115,64 @@ class Terminal(Service):
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the Terminal has valid login credentials"""
return self.terminal.login_status
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator"""
return "Cannot perform request on terminal as not logged in."
return ("Cannot perform request on terminal as not logged in.")
# %% Inbound
def _generate_connection_uuid(self) -> str:
"""Generate a unique connection ID."""
# This might not be needed given user_manager.login() returns a UUID.
return str(uuid4())
def login(self, dest_ip_address: IPv4Address, **kwargs) -> bool:
def login(self, username: str, password: str, ip_address: Optional[IPv4Address]=None) -> bool:
"""Process User request to login to Terminal.
:param dest_ip_address: The IP address of the node we want to connect to.
:param username: The username credential.
:param password: The user password component of credentials.
:return: True if successful, False otherwise.
"""
if self.operating_state != ServiceOperatingState.RUNNING:
self.sys_log.warning("Cannot process login as service is not running")
return False
if self.connection_uuid in self.user_connections:
self.sys_log.debug("User authentication passed")
# need to determine if this is a local or remote login
if ip_address:
# ip_address has been given for remote login
return self._send_remote_login(username=username, password=password, ip_address=ip_address)
return self._process_local_login(username=username, password=password)
def _process_local_login(self, username: str, password: str) -> bool:
"""Local session login to terminal."""
self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
if self.connection_uuid:
self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}")
return True
else:
# Need to send a login request
# TODO: Refactor with UserManager changes to provide correct credentials and validate.
transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN
payload: SSHPacket = SSHPacket(
payload="login", transport_message=transport_message, connection_message=connection_message
)
self.sys_log.warning("Login failed, incorrect Username or Password")
return False
self.sys_log.info(f"Sending login request to {dest_ip_address}")
self.send(payload=payload, dest_ip_address=dest_ip_address)
def _send_remote_login(self, username: str, password: str, ip_address: IPv4Address) -> bool:
"""Attempt to login to a remote terminal."""
pass
def _ssh_process_login(self, dest_ip_address: IPv4Address, user_account: dict, **kwargs) -> bool:
"""Processes the login attempt. Returns a bool which either rejects the login or accepts it."""
# we assume that the login fails unless we meet all the criteria.
transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED
# Hard coded at current - replace with another method to handle local accounts.
if user_account == "Username: placeholder, Password: placeholder": # hardcoded
self.connection_uuid = self._generate_connection_uuid()
if not self.add_connection(connection_id=self.connection_uuid):
self.sys_log.warning(
f"{self.name}: Connect request for {dest_ip_address} declined. Service is at capacity."
)
return False
else:
self.sys_log.info(f"{self.name}: Connect request for ID: {self.connection_uuid} authorised")
transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_CONFIRMATION
new_connection = TerminalClientConnection(
parent_node=self.software_manager.node,
connection_id=self.connection_uuid,
dest_ip_address=dest_ip_address,
)
self.user_connections[self.connection_uuid] = new_connection
self.is_connected = True
payload: SSHPacket = SSHPacket(transport_message=transport_message, connection_message=connection_message)
def _process_remote_login(self, username: str, password: str, ip_address:IPv4Address) -> bool:
"""Processes a remote terminal requesting to login to this terminal."""
self.connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password)
if self.connection_uuid:
# Send uuid to remote
self.sys_log.info(f"Remote login authorised, connection ID {self.connection_uuid} for {username} on {ip_address}")
# send back to origin.
return True
else:
self.sys_log.warning("Login failed, incorrect Username or Password")
return False
self.send(payload=payload, dest_ip_address=dest_ip_address)
return True
def _ssh_process_logoff(self, session_id: str, *args, **kwargs) -> bool:
"""Process the logoff attempt. Return a bool if succesful or unsuccessful."""
# TODO: Should remove
def receive(self, payload: SSHPacket, session_id: str, **kwargs) -> bool:
"""Receive Payload and process for a response."""
@@ -213,12 +183,9 @@ class Terminal(Service):
self.sys_log.warning("Cannot process message as not running")
return False
self.sys_log.debug(f"Received payload: {payload} from session: {session_id}")
if payload.connection_message == SSHConnectionMessage.SSH_MSG_CHANNEL_CLOSE:
connection_id = kwargs["connection_id"]
dest_ip_address = kwargs["dest_ip_address"]
self._ssh_process_logoff(session_id=session_id)
self.disconnect(dest_ip_address=dest_ip_address)
self.sys_log.debug(f"Disconnecting {connection_id}")
# We need to close on the other machine as well
@@ -240,38 +207,6 @@ class Terminal(Service):
return True
# %% Outbound
def _ssh_remote_login(self, dest_ip_address: IPv4Address, user_account: Optional[dict] = None) -> bool:
"""Remote login to terminal via SSH."""
if not user_account:
# TODO: Generic hardcoded info, will need to be updated with UserManager.
user_account = "Username: placeholder, Password: placeholder"
# something like self.user_manager.get_user_details ?
# Implement SSHPacket class
payload: SSHPacket = SSHPacket(
transport_message=SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST,
connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN,
user_account=user_account,
)
if self.send(payload=payload, dest_ip_address=dest_ip_address):
if payload.connection_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS:
self.sys_log.info(f"{self.name} established an ssh connection with {dest_ip_address}")
# Need to confirm if self.uuid is correct.
self.add_connection(self, connection_id=self.uuid, session_id=self.session_id)
return True
else:
self.sys_log.error("Login Failed. Incorrect credentials provided.")
return False
else:
self.sys_log.error("Login Failed. Incorrect credentials provided.")
return False
def check_connection(self, connection_id: str) -> bool:
"""Check whether the connection is valid."""
if self.is_connected:
return self.send(dest_ip_address=self.dest_ip_address, connection_id=connection_id)
else:
return False
def disconnect(self, dest_ip_address: IPv4Address) -> bool:
"""Disconnect from remote connection.

View File

@@ -80,7 +80,7 @@ def test_terminal_fail_when_closed(basic_network):
terminal.operating_state = ServiceOperatingState.STOPPED
assert terminal.login(dest_ip_address="192.168.0.11") is False
assert terminal.login(ip_address="192.168.0.11") is False
def test_terminal_disconnect(basic_network):
@@ -91,7 +91,7 @@ def test_terminal_disconnect(basic_network):
assert terminal.is_connected is False
terminal.login(dest_ip_address="192.168.0.11")
terminal.login(ip_address="192.168.0.11")
assert terminal.is_connected is True
@@ -108,7 +108,7 @@ def test_terminal_ignores_when_off(basic_network):
computer_b: Computer = network.get_node_by_hostname("node_b")
terminal_a.login(dest_ip_address="192.168.0.11") # login to computer_b
terminal_a.login(ip_address="192.168.0.11") # login to computer_b
assert terminal_a.is_connected is True