#2711 - Added in remote_login and process_login methods. Minor updates to make pydantic happy. Starting to flesh out functionality of Terminal Service in more detail

This commit is contained in:
Charlie Crane
2024-07-02 16:47:39 +01:00
parent bd05f4d4e8
commit ebf6e7a90e
2 changed files with 73 additions and 72 deletions

View File

@@ -60,11 +60,11 @@ class SSHConnectionMessage(IntEnum):
class SSHPacket(DataPacket):
"""Represents an SSHPacket."""
transport_message: SSHTransportMessage
transport_message: SSHTransportMessage = None
connection_message: SSHConnectionMessage
connection_message: SSHConnectionMessage = None
ssh_command: Optional[any] = None # This is the request string
ssh_command: Optional[str] = None # This is the request string
ssh_output: Optional[RequestResponse] = None # The Request Manager's returned RequestResponse

View File

@@ -8,8 +8,7 @@ from uuid import uuid4
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestPermissionValidator
from primaite.simulator.network.protocols.icmp import ICMPPacket
# from primaite.simulator.network.protocols.ssh import SSHPacket, SSHTransportMessage, SSHConnectionMessage
from primaite.simulator.network.protocols.ssh import SSHConnectionMessage, SSHPacket, SSHTransportMessage
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service, ServiceOperatingState
@@ -21,19 +20,22 @@ class Terminal(Service):
user_account: Optional[str] = None
"The User Account used for login"
connected: bool = False
is_connected: bool = False
"Boolean Value for whether connected"
connection_uuid: Optional[str] = None
"Uuid for connection requests"
operating_state: ServiceOperatingState = ServiceOperatingState.INSTALLING
"""Service Operating State""" # Install at start ??? Maybe ???
def __init__(self, **kwargs):
kwargs["name"] = "Terminal"
kwargs["port"] = Port.SSH
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
self.operating_state = ServiceOperatingState.RUNNING
# self.operating_state = ServiceOperatingState.RUNNING
class _LoginValidator(RequestPermissionValidator):
"""
@@ -46,34 +48,22 @@ class Terminal(Service):
def __call__(self, request: RequestFormat, context: Dict) -> bool:
"""Return whether the login credentials are valid."""
pass
# TODO: Expand & Implement logic when we have User Accounts.
if self.terminal.is_connected:
return True
else:
self.terminal.sys_log.error("terminal is not logged in.")
@property
def fail_message(self) -> str:
"""Message that is reported when a request is rejected by this validator."""
return (
f"Cannot perform request on Terminal '{self.terminal.hostname}' because login credentials are invalid"
)
return f"Cannot perform request on Terminal '{self.terminal.name}' because login credentials are invalid"
def _validate_login(self) -> bool:
"""Validate login credentials when receiving commands."""
# TODO: Implement
return True
def receive_payload_from_software_manager(
self,
payload: Any,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
session_id: Optional[str] = None,
ip_protocol: IPProtocol = IPProtocol.TCP,
icmp_packet: Optional[ICMPPacket] = None,
connection_id: Optional[str] = None,
) -> Union[Any, None]:
"""Receive Software Manager Payload."""
self._validate_login()
def _init_request_manager(self) -> RequestManager:
"""Initialise Request manager."""
# _login_is_valid = Terminal._LoginValidator(terminal=self)
@@ -101,7 +91,7 @@ class Terminal(Service):
"""
state = super().describe_state()
# TBD
state.update({"hostname": self.hostname})
state.update({"hostname": self.name})
return state
def execute(self, command: Any, request: Any) -> Optional[RequestResponse]:
@@ -133,58 +123,69 @@ class Terminal(Service):
# %%
# def _ssh_process_login(self, user_account: dict, **kwargs) -> SSHPacket:
# """Processes the login attempt. Returns a SSHPacket which either rejects the login or accepts it."""
# # we assume that the login fails unless we meet all the criteria.
# transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE
# connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED
# # operating state validation here(if overwhelmed)
def _ssh_process_login(self, user_account: dict, **kwargs) -> SSHPacket:
"""Processes the login attempt. Returns a SSHPacket which either rejects the login or accepts it."""
# we assume that the login fails unless we meet all the criteria.
transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_FAILURE
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_FAILED
# operating state validation here(if overwhelmed)
# # Hard coded at current - replace with another method to handle local accounts.
# if user_account == f"{self.user_name:} placeholder, {self.password:} placeholder": # hardcoded
# connection_id = self._generate_connection_id()
# if not self.add_connection(self, connection_id="ssh_connection", session_id=self.session_id):
# self.sys_log.warning(f"{self.name}: Connect request for {self.src_ip} declined.
# Service is at capacity.")
# ...
# else:
# self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised")
# transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS
# connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_CONFIRMATION
# Hard coded at current - replace with another method to handle local accounts.
if user_account == f"{self.user_name:} placeholder, {self.password:} placeholder": # hardcoded
connection_id = self._generate_connection_id()
if not self.add_connection(self, connection_id="ssh_connection", session_id=self.session_id):
self.sys_log.warning(
f"{self.name}: Connect request for {self.src_ip} declined. Service is at capacity."
)
else:
self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised")
transport_message = SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS
connection_message = SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN_CONFIRMATION
self.is_connected = True
# payload: SSHPacket = SSHPacket(transport_message = transport_message, connection_message = connection_message)
# return payload
payload: SSHPacket = SSHPacket(transport_message=transport_message, connection_message=connection_message)
return payload
# %%
# Copy + Paste from Terminal Wiki
# def ssh_remote_login(self, dest_ip_address = IPv4Address, user_account: Optional[dict] = None) -> bool:
# if user_account:
# # Setting default creds (Best to use this until we have more clarification on the specifics of user accounts)
# self.user_account = {self.user_name:"placeholder", self.password:"placeholder"}
def ssh_remote_login(self, dest_ip_address: IPv4Address, user_account: Optional[dict] = None) -> bool:
"""Remote login to terminal via SSH."""
if user_account:
# Setting default creds (Best to use this until we have more clarification around user accounts)
self.user_account = {self.user_name: "placeholder", self.password: "placeholder"}
# Implement SSHPacket class
payload: SSHPacket = SSHPacket(
transport_message=SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST,
connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN,
user_account=user_account,
)
if self.send(payload=payload, dest_ip_address=dest_ip_address):
if payload.connection_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS:
self.sys_log.info(f"{self.name} established an ssh connection with {dest_ip_address}")
# Need to confirm if self.uuid is correct.
self.add_connection(self, connection_id=self.uuid, session_id=self.session_id)
return True
else:
self.sys_log.error("Payload type incorrect, Login Failed")
return False
else:
self.sys_log.error("Incorrect credentials provided. Login Failed.")
return False
# # Implement SSHPacket class
# payload: SSHPacket = SSHPacket(transport_message= SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST,
# connection_message= SSHConnectionMessage.SSH_MSG_CHANNEL_OPEN,
# user_account=user_account)
# if self.send(payload=payload,dest_ip_address=dest_ip_address):
# if payload.connection_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS:
# self.sys_log.info(f"{self.name} established an ssh connection with {dest_ip_address}")
# # Need to confirm if self.uuid is correct.
# self.add_connection(self, connection_id=self.uuid, session_id=self.session_id)
# return True
# else:
# self.sys_log.error("Payload type incorrect, Login Failed")
# return False
# else:
# self.sys_log.error("Incorrect credentials provided. Login Failed.")
# return False
# %%
def connect(self, **kwargs):
"""Send connect request."""
self._connect(self, **kwargs)
def _connect(self):
"""Do something."""
pass
def receive_payload_from_software_manager(
self,
payload: Any,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
session_id: Optional[str] = None,
ip_protocol: IPProtocol = IPProtocol.TCP,
icmp_packet: Optional[ICMPPacket] = None,
connection_id: Optional[str] = None,
) -> Union[Any, None]:
"""Receive Software Manager Payload."""
self._validate_login()