#2713 - eod commit. Initial RequestManager Test implemented, along with an initial setup of the additional Request Manager methods.

This commit is contained in:
Charlie Crane
2024-07-26 16:56:03 +01:00
parent 978e2c5a52
commit 0ac1c6702c
3 changed files with 205 additions and 19 deletions

View File

@@ -64,7 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added ability to log each agent's action choices in each step to a JSON file.
- Removal of Link bandwidth hardcoding. This can now be configured via the network configuration yaml. Will default to 100 if not present.
- Added NMAP application to all host and layer-3 network nodes.
- Added Terminal Class for HostNode components
- Added Terminal Class for HostNode components.
### Bug Fixes

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from uuid import uuid4
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel
from primaite.interface.request import RequestFormat, RequestResponse
@@ -35,17 +36,12 @@ class TerminalClientConnection(BaseModel):
is_active: bool = True
"""Flag to state whether the connection is still active or not."""
_dest_ip_address: IPv4Address
dest_ip_address: IPv4Address = None
"""Destination IP address of connection"""
_connection_uuid: str = None
"""Connection UUID"""
@property
def dest_ip_address(self) -> Optional[IPv4Address]:
"""Destination IP Address."""
return self._dest_ip_address
@property
def client(self) -> Optional[Terminal]:
"""The Terminal that holds this connection."""
@@ -95,6 +91,21 @@ class Terminal(Service):
"""Apply Terminal Request."""
return super().apply_request(request, context)
def show(self, markdown: bool = False):
"""
Display the remote connections to this terminal instance in tabular format.
:param markdown: Whether to display the table in Markdown format or not. Default is `False`.
"""
table = PrettyTable(["Connection ID", "IP_Address", "Active"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} {self.name} Remote Connections"
for connection_id, connection in self.remote_connection.items():
table.add_row([connection_id, connection.dest_ip_address, connection.is_active])
print(table.get_string(sortby="Connection ID"))
def _init_request_manager(self) -> RequestManager:
"""Initialise Request manager."""
_login_valid = Terminal._LoginValidator(terminal=self)
@@ -106,12 +117,52 @@ class Terminal(Service):
func=lambda request, context: RequestResponse.from_bool(self.send()), validator=_login_valid
),
)
return rm
def _validate_login(self) -> bool:
"""Validate login credentials are valid."""
# return self.parent.UserSessionManager.validate_remote_session_uuid(self.connection_uuid)
return True
def _login(request: List[Any], context: Any) -> RequestResponse:
login = self._process_local_login(username=request[0], password=request[1])
if login == True:
return RequestResponse(status="success", data={})
else:
return RequestResponse(status="failure", data={})
def _remote_login(request: List[Any], context: Any) -> RequestResponse:
self._process_remote_login(username=request[0], password=request[1], ip_address=request[2])
if self.is_connected:
return RequestResponse(status="success", data={})
else:
return RequestResponse(status="failure", data={})
def _execute(request: List[Any], context: Any) -> RequestResponse:
"""Execute an instruction."""
command: str = request[0]
self.execute(command)
return RequestResponse(status="success", data={})
def _logoff() -> RequestResponse:
"""Logoff from connection."""
self.parent.UserSessionManager.logoff(self.connection_uuid)
self.disconnect(self.connection_uuid)
return RequestResponse(status="success")
rm.add_request(
"Login",
request_type=RequestType(func=_login),
)
rm.add_request(
"Remote Login",
request_type=RequestType(func=_remote_login),
)
rm.add_request(
"Execute",
request_type=RequestType(func=_execute),
)
rm.add_request("Logoff", request_type=RequestType(func=_logoff))
return rm
class _LoginValidator(RequestPermissionValidator):
"""
@@ -155,7 +206,8 @@ class Terminal(Service):
def _process_local_login(self, username: str, password: str) -> bool:
"""Local session login to terminal."""
# self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password)
self.connection_uuid = str(uuid4())
self.connection_uuid = str(uuid4()) # TODO: Remove following merging of UserSessionManager.
self.is_connected = True
if self.connection_uuid:
self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}")
return True
@@ -187,7 +239,7 @@ class Terminal(Service):
self.sys_log.info(f"Sending UserAuth request to UserSessionManager, username={username}, password={password}")
# connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password)
connection_uuid = str(uuid4())
self.is_connected = True
if connection_uuid:
# Send uuid to remote
self.sys_log.info(
@@ -203,14 +255,14 @@ class Terminal(Service):
sender_ip_address=self.parent.network_interface[1].ip_address,
target_ip_address=payload.sender_ip_address,
)
self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address)
self.remote_connection[connection_uuid] = TerminalClientConnection(
parent_node=self.software_manager.node,
_dest_ip_address=payload.sender_ip_address,
dest_ip_address=payload.sender_ip_address,
connection_uuid=connection_uuid,
)
self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address)
return True
else:
# UserSessionManager has returned None
@@ -246,6 +298,9 @@ class Terminal(Service):
self.is_connected = True
return True
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
return self.execute(command=payload.payload)
else:
self.sys_log.warning("Encounter unexpected message type, rejecting connection")
return False
@@ -254,6 +309,14 @@ class Terminal(Service):
# %% Outbound
def execute(self, command: List[Any]) -> bool:
"""Execute a passed ssh command via the request manager."""
if command[0] == "install":
self.parent.software_manager.software.install(command[1])
return True
# TODO: Expand as necessary
def _disconnect(self, dest_ip_address: IPv4Address) -> bool:
"""Disconnect from the remote."""
if not self.is_connected:
@@ -266,7 +329,7 @@ class Terminal(Service):
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(
payload={"type": "disconnect", "connection_id": self.remote_connection._connection_uuid},
payload={"type": "disconnect", "connection_id": self.connection_uuid},
dest_ip_address=dest_ip_address,
dest_port=self.port,
)

View File

@@ -3,12 +3,20 @@ from typing import Tuple
import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.protocols.ssh import SSHConnectionMessage, SSHPacket, SSHTransportMessage
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.software import SoftwareHealthState
@@ -117,7 +125,7 @@ def test_terminal_ignores_when_off(basic_network):
computer_b: Computer = network.get_node_by_hostname("node_b")
terminal_a.login(ip_address="192.168.0.11") # login to computer_b
terminal_a.login(username="admin", password="Admin123!", ip_address="192.168.0.11") # login to computer_b
assert terminal_a.is_connected is True
@@ -127,6 +135,121 @@ def test_terminal_ignores_when_off(basic_network):
payload="Test_Payload",
transport_message=SSHTransportMessage.SSH_MSG_SERVICE_REQUEST,
connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_DATA,
sender_ip_address=computer_a.network_interface[1].ip_address,
target_ip_address="192.168.0.11",
)
assert not terminal_a.send(payload=payload, dest_ip_address="192.168.0.11")
def test_terminal_acknowledges_acl_rules(basic_network):
"""Test that Terminal messages"""
network: Network = basic_network
computer_a: Computer = network.get_node_by_hostname("node_a")
terminal_a: Terminal = computer_a.software_manager.software.get("Terminal")
terminal_a.login(username="admin", password="Admin123!", ip_address="192.168.0.11")
router = Router(hostname="router", num_ports=3, start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.SSH, dst_port=Port.SSH, position=22)
def test_network_simulation(basic_network):
# 0: Pull out the network
network = basic_network
# 1: Set up network hardware
# 1.1: Configure the router
router = Router(hostname="router", num_ports=3, start_up_duration=0)
router.power_on()
router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0")
router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0")
# 1.2: Create and connect switches
switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0)
switch_1.power_on()
network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6])
router.enable_port(1)
switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0)
switch_2.power_on()
network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6])
router.enable_port(2)
# 1.3: Create and connect computer
client_1 = Computer(
hostname="client_1",
ip_address="10.0.1.2",
subnet_mask="255.255.255.0",
default_gateway="10.0.1.1",
start_up_duration=0,
)
client_1.power_on()
network.connect(
endpoint_a=client_1.network_interface[1],
endpoint_b=switch_1.network_interface[1],
)
# 1.4: Create and connect servers
server_1 = Server(
hostname="server_1",
ip_address="10.0.2.2",
subnet_mask="255.255.255.0",
default_gateway="10.0.2.1",
start_up_duration=0,
)
server_1.power_on()
network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1])
server_2 = Server(
hostname="server_2",
ip_address="10.0.2.3",
subnet_mask="255.255.255.0",
default_gateway="10.0.2.1",
start_up_duration=0,
)
server_2.power_on()
network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2])
# 2: Configure base ACL
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router.acl.add_rule(action=ACLAction.DENY, protocol=IPProtocol.ICMP, position=23)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.DNS, dst_port=Port.DNS, position=1)
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=3)
# 3: Install server software
server_1.software_manager.install(DNSServer)
dns_service: DNSServer = server_1.software_manager.software.get("DNSServer") # noqa
dns_service.dns_register("www.example.com", server_2.network_interface[1].ip_address)
server_2.software_manager.install(WebServer)
# 3.1: Ensure that the dns clients are configured correctly
client_1.software_manager.software.get("DNSClient").dns_server = server_1.network_interface[1].ip_address
server_2.software_manager.software.get("DNSClient").dns_server = server_1.network_interface[1].ip_address
terminal_1: Terminal = client_1.software_manager.software.get("Terminal")
assert terminal_1.login(username="admin", password="Admin123!", ip_address="192.168.0.11") is False
def test_terminal_receives_requests(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]):
game, agent = game_and_agent_fixture
network: Network = game.simulation.network
computer_a: Computer = network.get_node_by_hostname("node_a")
terminal_a: Terminal = computer_a.software_manager.software.get("Terminal")
computer_b: Computer = network.get_node_by_hostname("node_b")
assert terminal_a.is_connected is False
action = ("TERMINAL_LOGIN", {"username": "admin", "password": "Admin123!"}) # TODO: Add Action to ActionManager ?
agent.store_action(action)
game.step()
assert terminal_a.is_connected is True