#1916: Setting up a connected states + added tests + error states for if service is interacted with when not running

This commit is contained in:
Czar.Echavez
2023-09-22 15:38:01 +01:00
parent 58edb6d3e4
commit 2c234ab67a
5 changed files with 117 additions and 16 deletions

View File

@@ -1,9 +1,12 @@
from typing import Any, Optional
from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from primaite.simulator.network.protocols.ftp import FTPPacket
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.session_manager import Session
from primaite.simulator.system.services.ftp.ftp_service import FTPServiceABC
from primaite.simulator.system.services.service import ServiceOperatingState
class FTPServer(FTPServiceABC):
@@ -17,6 +20,9 @@ class FTPServer(FTPServiceABC):
server_password: Optional[str] = None
"""Password needed to connect to FTP server. Default is None."""
connections: Dict[str, IPv4Address] = {}
"""Current active connections to the FTP server."""
def __init__(self, **kwargs):
kwargs["name"] = "FTPServer"
kwargs["port"] = Port.FTP
@@ -24,6 +30,37 @@ class FTPServer(FTPServiceABC):
super().__init__(**kwargs)
self.start()
def _get_session_details(self, session_id: str) -> Session:
"""
Returns the Session object from the given session id.
:param: session_id: ID of the session that needs details retrieved
"""
return self.software_manager.session_manager.sessions_by_uuid[session_id]
def _process_ftp_command(self, payload: FTPPacket, session_id: Optional[str] = None, **kwargs) -> FTPPacket:
# if server is down, return error
if self.operating_state != ServiceOperatingState.RUNNING:
payload.status_code = FTPStatusCode.ERROR
return payload
# process server specific commands, otherwise call super
if payload.ftp_command == FTPCommand.PORT:
# check that the port is valid
if isinstance(payload.ftp_command_args, Port) and payload.ftp_command_args.value in range(0, 65535):
# return successful connection
session_details = self._get_session_details(session_id)
self.connections[session_id] = session_details.with_ip_address
payload.status_code = FTPStatusCode.OK
return payload
if payload.ftp_command == FTPCommand.QUIT:
session_details = self._get_session_details(session_id)
self.connections.pop(session_id)
payload.status_code = FTPStatusCode.OK
return super()._process_ftp_command(payload=payload, session_id=session_id, **kwargs)
def receive(self, payload: Any, session_id: Optional[str] = None, **kwargs) -> bool:
"""Receives a payload from the SessionManager."""
if not isinstance(payload, FTPPacket):