#2710 - Initial implementation f the receive/send methods. Committing to change branch

This commit is contained in:
Charlie Crane
2024-07-09 11:54:33 +01:00
parent 61c2ee0ee8
commit 42602be953
2 changed files with 65 additions and 4 deletions

View File

@@ -293,6 +293,7 @@ class HostNode(Node):
* DNS (Domain Name System) Client: Resolves domain names to IP addresses.
* FTP (File Transfer Protocol) Client: Enables file transfers between the host and FTP servers.
* NTP (Network Time Protocol) Client: Synchronizes the system clock with NTP servers.
* Terminal Client: Handles SSH requests between HostNode and external components.
Applications:
------------

View File

@@ -2,14 +2,14 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from uuid import uuid4
from pydantic import BaseModel
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.protocols.ssh import SSHConnectionMessage, SSHPacket, SSHTransportMessage
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@@ -27,7 +27,7 @@ class TerminalClientConnection(BaseModel):
connection_id: str
"""Connection UUID."""
parent_node: HostNode
parent_node: Node # Technically I think this should be HostNode, but that causes a circular import.
"""The parent Node that this connection was created on."""
is_active: bool = True
@@ -116,7 +116,7 @@ class Terminal(Service):
self.process_login(dest_ip_address=dest_ip_address, user_account=user_account)
def _ssh_process_login(self, dest_ip_address: IPv4Address, user_account: dict, **kwargs) -> bool:
"""Processes the login attempt. Returns a SSHPacket which either rejects the login or accepts it."""
"""Processes the login attempt. Returns a bool which either rejects the login or accepts it."""
# we assume that the login fails unless we meet all the criteria.
transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED
@@ -142,6 +142,56 @@ class Terminal(Service):
self.send(payload=payload, dest_ip_address=dest_ip_address)
return True
def validate_user(self, user: Dict[str]) -> bool:
return True if user.get("username") in self.user_connections else False
def _ssh_process_logoff(self, dest_ip_address: IPv4Address, user_account: dict, **kwargs) -> bool:
"""Process the logoff attempt. Return a bool if succesful or unsuccessful."""
if self.validate_user(user_account):
# Account is logged in
self.user_connections.pop[user_account["username"]] # assumption atm
self.is_connected = False
return True
else:
self.sys_log.warning("User account credentials invalid.")
def _ssh_process_command(self, session_id: str, *args, **kwargs) -> bool:
return True
def send_logoff_ack(self):
"""Send confirmation of successful disconnect"""
transport_message = SSHTransportMessage.SSH_MSG_SERVICE_SUCCESS
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_CLOSE
payload: SSHPacket = SSHPacket(transport_message=transport_message, connection_message=connection_message, ssh_output=RequestResponse(status="success"))
self.send(payload=payload)
def receive(self, payload: SSHPacket, session_id: str, **kwargs) -> bool:
self.sys_log.debug(f"Received payload: {payload} from session: {session_id}")
if payload.connection_message ==SSHConnectionMessage. SSH_MSG_CHANNEL_CLOSE:
result = self._ssh_process_logoff(session_id=session_id)
# We need to close on the other machine as well
self.send_logoff_ack()
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST:
src_ip = kwargs.get("frame").ip.src_ip_address
user_account = payload.get("user_account", {})
result = self._ssh_process_login(src_ip=src_ip, session_id=session_id, user_account=user_account)
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
# Ensure we only ever process requests if we have a established connection (e.g session_id is provided and validated)
result = self._ssh_process_command(session_id=session_id)
else:
self.sys_log.warning("Encounter unexpected message type, rejecting connection")
# send a SSH_MSG_CHANNEL_CLOSE if there is a session_id otherwise SSH_MSG_OPEN_FAILED
return False
self.send(payload=result, session_id=session_id)
return True
# %% Outbound
def login(self, dest_ip_address: IPv4Address) -> bool:
@@ -217,3 +267,13 @@ class Terminal(Service):
)
self.connected = False
return True
def send(
self,
payload: SSHPacket,
dest_ip_address: Optional[IPv4Address] = None,
session_id: Optional[str] = None,
) -> bool:
return super().send(payload=payload, dest_ip_address=dest_ip_address, dest_port=Port.SSH, session_id=session_id)