Merged PR 488: #2713 - Terminal Request Manager Methods

## Summary
This PR merges the initial implementation of the terminals use of the `RequestManager`. Introducing the ability to send commands to `Login`, `Remote Login` and `Execute`.

## Test process
Unit test written for the request manager.

## Checklist
- [X] PR is linked to a **work item**
- [X] **acceptance criteria** of linked ticket are met
- [X] performed **self-review** of the code
- [X] written **tests** for any new functionality added with this PR
- [X] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [X] updated the **change log**
- [X] ran **pre-commit** checks for code style
- [X] attended to any **TO-DOs** left in the code

Related work items: #2713
This commit is contained in:
Charlie Crane
2024-07-29 09:56:08 +00:00
3 changed files with 218 additions and 28 deletions

View File

@@ -64,7 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added ability to log each agent's action choices in each step to a JSON file.
- Removal of Link bandwidth hardcoding. This can now be configured via the network configuration yaml. Will default to 100 if not present.
- Added NMAP application to all host and layer-3 network nodes.
- Added Terminal Class for HostNode components
- Added Terminal Class for HostNode components.
### Bug Fixes

View File

@@ -2,9 +2,9 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Dict, List, Optional
from uuid import uuid4
from typing import Any, Dict, List, Optional
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel
from primaite.interface.request import RequestFormat, RequestResponse
@@ -35,17 +35,12 @@ class TerminalClientConnection(BaseModel):
is_active: bool = True
"""Flag to state whether the connection is still active or not."""
_dest_ip_address: IPv4Address
dest_ip_address: IPv4Address = None
"""Destination IP address of connection"""
_connection_uuid: str = None
"""Connection UUID"""
@property
def dest_ip_address(self) -> Optional[IPv4Address]:
"""Destination IP Address."""
return self._dest_ip_address
@property
def client(self) -> Optional[Terminal]:
"""The Terminal that holds this connection."""
@@ -95,6 +90,21 @@ class Terminal(Service):
"""Apply Terminal Request."""
return super().apply_request(request, context)
def show(self, markdown: bool = False):
"""
Display the remote connections to this terminal instance in tabular format.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["Connection ID", "IP_Address", "Active"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} {self.name} Remote Connections"
for connection_id, connection in self.remote_connection.items():
table.add_row([connection_id, connection.dest_ip_address, connection.is_active])
print(table.get_string(sortby="Connection ID"))
def _init_request_manager(self) -> RequestManager:
"""Initialise Request manager."""
_login_valid = Terminal._LoginValidator(terminal=self)
@@ -106,12 +116,52 @@ class Terminal(Service):
func=lambda request, context: RequestResponse.from_bool(self.send()), validator=_login_valid
),
)
return rm
def _validate_login(self) -> bool:
"""Validate login credentials are valid."""
# return self.parent.UserSessionManager.validate_remote_session_uuid(self.connection_uuid)
return True
def _login(request: List[Any], context: Any) -> RequestResponse:
login = self._process_local_login(username=request[0], password=request[1])
if login == True:
return RequestResponse(status="success", data={})
else:
return RequestResponse(status="failure", data={})
def _remote_login(request: List[Any], context: Any) -> RequestResponse:
self._process_remote_login(username=request[0], password=request[1], ip_address=request[2])
if self.is_connected:
return RequestResponse(status="success", data={})
else:
return RequestResponse(status="failure", data={})
def _execute(request: List[Any], context: Any) -> RequestResponse:
"""Execute an instruction."""
command: str = request[0]
self.execute(command)
return RequestResponse(status="success", data={})
def _logoff() -> RequestResponse:
"""Logoff from connection."""
self.parent.UserSessionManager.logoff(self.connection_uuid)
self.disconnect(self.connection_uuid)
return RequestResponse(status="success")
rm.add_request(
"Login",
request_type=RequestType(func=_login),
)
rm.add_request(
"Remote Login",
request_type=RequestType(func=_remote_login),
)
rm.add_request(
"Execute",
request_type=RequestType(func=_execute, validator=_login_valid),
)
rm.add_request("Logoff", request_type=RequestType(func=_logoff, validator=_login_valid))
return rm
class _LoginValidator(RequestPermissionValidator):
"""
@@ -154,8 +204,8 @@ class Terminal(Service):
def _process_local_login(self, username: str, password: str) -> bool:
"""Local session login to terminal."""
# self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
self.connection_uuid = str(uuid4())
self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
self.is_connected = True
if self.connection_uuid:
self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}")
return True
@@ -181,13 +231,16 @@ class Terminal(Service):
return self.send(payload=payload, dest_ip_address=ip_address)
def _process_remote_login(self, payload: SSHPacket) -> bool:
"""Processes a remote terminal requesting to login to this terminal."""
"""Processes a remote terminal requesting to login to this terminal.
:param payload: The SSH Payload Packet.
:return: True if successful, else False.
"""
username: str = payload.user_account.username
password: str = payload.user_account.password
self.sys_log.info(f"Sending UserAuth request to UserSessionManager, username={username}, password={password}")
# connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password)
connection_uuid = str(uuid4())
connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password)
self.is_connected = True
if connection_uuid:
# Send uuid to remote
self.sys_log.info(
@@ -203,14 +256,14 @@ class Terminal(Service):
sender_ip_address=self.parent.network_interface[1].ip_address,
target_ip_address=payload.sender_ip_address,
)
self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address)
self.remote_connection[connection_uuid] = TerminalClientConnection(
parent_node=self.software_manager.node,
_dest_ip_address=payload.sender_ip_address,
dest_ip_address=payload.sender_ip_address,
connection_uuid=connection_uuid,
)
self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address)
return True
else:
# UserSessionManager has returned None
@@ -218,7 +271,11 @@ class Terminal(Service):
return False
def receive(self, payload: SSHPacket, **kwargs) -> bool:
"""Receive Payload and process for a response."""
"""Receive Payload and process for a response.
:param payload: The message contents received.
:return: True if successfull, else False.
"""
self.sys_log.debug(f"Received payload: {payload}")
if not isinstance(payload, SSHPacket):
@@ -234,11 +291,9 @@ class Terminal(Service):
dest_ip_address = kwargs["dest_ip_address"]
self.disconnect(dest_ip_address=dest_ip_address)
self.sys_log.debug(f"Disconnecting {connection_id}")
# We need to close on the other machine as well
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST:
"""Login Request Received."""
self._process_remote_login(payload=payload)
return self._process_remote_login(payload=payload)
elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS:
self.sys_log.info(f"Login Successful, connection ID is {payload.connection_uuid}")
@@ -246,6 +301,9 @@ class Terminal(Service):
self.is_connected = True
return True
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
return self.execute(command=payload.payload)
else:
self.sys_log.warning("Encounter unexpected message type, rejecting connection")
return False
@@ -254,6 +312,15 @@ class Terminal(Service):
# %% Outbound
def execute(self, command: List[Any]) -> bool:
"""Execute a passed ssh command via the request manager."""
# TODO: Expand as necessary, as new functionalilty is needed.
if command[0] == "install":
self.parent.software_manager.software.install(command[1])
return True
else:
return False
def _disconnect(self, dest_ip_address: IPv4Address) -> bool:
"""Disconnect from the remote."""
if not self.is_connected:
@@ -266,7 +333,7 @@ class Terminal(Service):
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(
payload={"type": "disconnect", "connection_id": self.remote_connection._connection_uuid},
payload={"type": "disconnect", "connection_id": self.connection_uuid},
dest_ip_address=dest_ip_address,
dest_port=self.port,
)

View File

@@ -3,12 +3,20 @@ from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.protocols.ssh import SSHConnectionMessage, SSHPacket, SSHTransportMessage
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.software import SoftwareHealthState
@@ -117,7 +125,7 @@ def test_terminal_ignores_when_off(basic_network):
computer_b: Computer = network.get_node_by_hostname("node_b")
terminal_a.login(ip_address="192.168.0.11") # login to computer_b
terminal_a.login(username="admin", password="Admin123!", ip_address="192.168.0.11") # login to computer_b
assert terminal_a.is_connected is True
@@ -127,6 +135,121 @@ def test_terminal_ignores_when_off(basic_network):
payload="Test_Payload",
transport_message=SSHTransportMessage.SSH_MSG_SERVICE_REQUEST,
connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_DATA,
sender_ip_address=computer_a.network_interface[1].ip_address,
target_ip_address="192.168.0.11",
)
assert not terminal_a.send(payload=payload, dest_ip_address="192.168.0.11")
def test_terminal_acknowledges_acl_rules(basic_network):
"""Test that Terminal messages"""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
terminal_a: Terminal = computer_a.software_manager.software.get("Terminal")
terminal_a.login(username="admin", password="Admin123!", ip_address="192.168.0.11")
router = Router(hostname="router", num_ports=3, start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.SSH, dst_port=Port.SSH, position=22)
def test_network_simulation(basic_network):
# 0: Pull out the network
network = basic_network
# 1: Set up network hardware
# 1.1: Configure the router
router = Router(hostname="router", num_ports=3, start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
# 1.2: Create and connect switches
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
switch_1.power_on()
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
router.enable_port(1)
switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.enable_port(2)
# 1.3: Create and connect computer
client_1 = Computer(
hostname="client_1",
ip_address="10.0.1.2",
subnet_mask="255.255.255.0",
default_gateway="10.0.1.1",
start_up_duration=0,
)
client_1.power_on()
network.connect(
endpoint_a=client_1.network_interface[1],
endpoint_b=switch_1.network_interface[1],
)
# 1.4: Create and connect servers
server_1 = Server(
hostname="server_1",
ip_address="10.0.2.2",
subnet_mask="255.255.255.0",
default_gateway="10.0.2.1",
start_up_duration=0,
)
server_1.power_on()
network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1])
server_2 = Server(
hostname="server_2",
ip_address="10.0.2.3",
subnet_mask="255.255.255.0",
default_gateway="10.0.2.1",
start_up_duration=0,
)
server_2.power_on()
network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2])
# 2: Configure base ACL
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router.acl.add_rule(action=ACLAction.DENY, protocol=IPProtocol.ICMP, position=23)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.DNS, dst_port=Port.DNS, position=1)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=3)
# 3: Install server software
server_1.software_manager.install(DNSServer)
dns_service: DNSServer = server_1.software_manager.software.get("DNSServer") # noqa
dns_service.dns_register("www.example.com", server_2.network_interface[1].ip_address)
server_2.software_manager.install(WebServer)
# 3.1: Ensure that the dns clients are configured correctly
client_1.software_manager.software.get("DNSClient").dns_server = server_1.network_interface[1].ip_address
server_2.software_manager.software.get("DNSClient").dns_server = server_1.network_interface[1].ip_address
terminal_1: Terminal = client_1.software_manager.software.get("Terminal")
assert terminal_1.login(username="admin", password="Admin123!", ip_address="192.168.0.11") is False
def test_terminal_receives_requests(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
game, agent = game_and_agent_fixture
network: Network = game.simulation.network
computer_a: Computer = network.get_node_by_hostname("node_a")
terminal_a: Terminal = computer_a.software_manager.software.get("Terminal")
computer_b: Computer = network.get_node_by_hostname("node_b")
assert terminal_a.is_connected is False
action = ("TERMINAL_LOGIN", {"username": "admin", "password": "Admin123!"}) # TODO: Add Action to ActionManager ?
agent.store_action(action)
game.step()
assert terminal_a.is_connected is True