diff --git a/CHANGELOG.md b/CHANGELOG.md index 24ff83ed..b27244bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,7 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added ability to log each agent's action choices in each step to a JSON file. - Removal of Link bandwidth hardcoding. This can now be configured via the network configuration yaml. Will default to 100 if not present. - Added NMAP application to all host and layer-3 network nodes. -- Added Terminal Class for HostNode components +- Added Terminal Class for HostNode components. ### Bug Fixes diff --git a/src/primaite/simulator/system/services/terminal/terminal.py b/src/primaite/simulator/system/services/terminal/terminal.py index f01b44a2..cadc8853 100644 --- a/src/primaite/simulator/system/services/terminal/terminal.py +++ b/src/primaite/simulator/system/services/terminal/terminal.py @@ -2,9 +2,9 @@ from __future__ import annotations from ipaddress import IPv4Address -from typing import Dict, List, Optional -from uuid import uuid4 +from typing import Any, Dict, List, Optional +from prettytable import MARKDOWN, PrettyTable from pydantic import BaseModel from primaite.interface.request import RequestFormat, RequestResponse @@ -35,17 +35,12 @@ class TerminalClientConnection(BaseModel): is_active: bool = True """Flag to state whether the connection is still active or not.""" - _dest_ip_address: IPv4Address + dest_ip_address: IPv4Address = None """Destination IP address of connection""" _connection_uuid: str = None """Connection UUID""" - @property - def dest_ip_address(self) -> Optional[IPv4Address]: - """Destination IP Address.""" - return self._dest_ip_address - @property def client(self) -> Optional[Terminal]: """The Terminal that holds this connection.""" @@ -95,6 +90,21 @@ class Terminal(Service): """Apply Terminal Request.""" return super().apply_request(request, context) + def show(self, markdown: bool = False): + """ + Display the remote connections to this terminal instance in tabular format. + + :param markdown: Whether to display the table in Markdown format or not. Default is `False`. + """ + table = PrettyTable(["Connection ID", "IP_Address", "Active"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.sys_log.hostname} {self.name} Remote Connections" + for connection_id, connection in self.remote_connection.items(): + table.add_row([connection_id, connection.dest_ip_address, connection.is_active]) + print(table.get_string(sortby="Connection ID")) + def _init_request_manager(self) -> RequestManager: """Initialise Request manager.""" _login_valid = Terminal._LoginValidator(terminal=self) @@ -106,12 +116,52 @@ class Terminal(Service): func=lambda request, context: RequestResponse.from_bool(self.send()), validator=_login_valid ), ) - return rm - def _validate_login(self) -> bool: - """Validate login credentials are valid.""" - # return self.parent.UserSessionManager.validate_remote_session_uuid(self.connection_uuid) - return True + def _login(request: List[Any], context: Any) -> RequestResponse: + login = self._process_local_login(username=request[0], password=request[1]) + if login == True: + return RequestResponse(status="success", data={}) + else: + return RequestResponse(status="failure", data={}) + + def _remote_login(request: List[Any], context: Any) -> RequestResponse: + self._process_remote_login(username=request[0], password=request[1], ip_address=request[2]) + if self.is_connected: + return RequestResponse(status="success", data={}) + else: + return RequestResponse(status="failure", data={}) + + def _execute(request: List[Any], context: Any) -> RequestResponse: + """Execute an instruction.""" + command: str = request[0] + self.execute(command) + return RequestResponse(status="success", data={}) + + def _logoff() -> RequestResponse: + """Logoff from connection.""" + self.parent.UserSessionManager.logoff(self.connection_uuid) + self.disconnect(self.connection_uuid) + + return RequestResponse(status="success") + + rm.add_request( + "Login", + request_type=RequestType(func=_login), + ) + + rm.add_request( + "Remote Login", + request_type=RequestType(func=_remote_login), + ) + + rm.add_request( + "Execute", + request_type=RequestType(func=_execute, validator=_login_valid), + ) + + rm.add_request("Logoff", request_type=RequestType(func=_logoff, validator=_login_valid)) + + return rm class _LoginValidator(RequestPermissionValidator): """ @@ -154,8 +204,8 @@ class Terminal(Service): def _process_local_login(self, username: str, password: str) -> bool: """Local session login to terminal.""" - # self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password) - self.connection_uuid = str(uuid4()) + self.connection_uuid = self.parent.UserSessionManager.login(username=username, password=password) + self.is_connected = True if self.connection_uuid: self.sys_log.info(f"Login request authorised, connection uuid: {self.connection_uuid}") return True @@ -181,13 +231,16 @@ class Terminal(Service): return self.send(payload=payload, dest_ip_address=ip_address) def _process_remote_login(self, payload: SSHPacket) -> bool: - """Processes a remote terminal requesting to login to this terminal.""" + """Processes a remote terminal requesting to login to this terminal. + + :param payload: The SSH Payload Packet. + :return: True if successful, else False. + """ username: str = payload.user_account.username password: str = payload.user_account.password self.sys_log.info(f"Sending UserAuth request to UserSessionManager, username={username}, password={password}") - # connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password) - connection_uuid = str(uuid4()) - + connection_uuid = self.parent.UserSessionManager.remote_login(username=username, password=password) + self.is_connected = True if connection_uuid: # Send uuid to remote self.sys_log.info( @@ -203,14 +256,14 @@ class Terminal(Service): sender_ip_address=self.parent.network_interface[1].ip_address, target_ip_address=payload.sender_ip_address, ) - self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address) self.remote_connection[connection_uuid] = TerminalClientConnection( parent_node=self.software_manager.node, - _dest_ip_address=payload.sender_ip_address, + dest_ip_address=payload.sender_ip_address, connection_uuid=connection_uuid, ) + self.send(payload=return_payload, dest_ip_address=return_payload.target_ip_address) return True else: # UserSessionManager has returned None @@ -218,7 +271,11 @@ class Terminal(Service): return False def receive(self, payload: SSHPacket, **kwargs) -> bool: - """Receive Payload and process for a response.""" + """Receive Payload and process for a response. + + :param payload: The message contents received. + :return: True if successfull, else False. + """ self.sys_log.debug(f"Received payload: {payload}") if not isinstance(payload, SSHPacket): @@ -234,11 +291,9 @@ class Terminal(Service): dest_ip_address = kwargs["dest_ip_address"] self.disconnect(dest_ip_address=dest_ip_address) self.sys_log.debug(f"Disconnecting {connection_id}") - # We need to close on the other machine as well elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST: - """Login Request Received.""" - self._process_remote_login(payload=payload) + return self._process_remote_login(payload=payload) elif payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_SUCCESS: self.sys_log.info(f"Login Successful, connection ID is {payload.connection_uuid}") @@ -246,6 +301,9 @@ class Terminal(Service): self.is_connected = True return True + elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST: + return self.execute(command=payload.payload) + else: self.sys_log.warning("Encounter unexpected message type, rejecting connection") return False @@ -254,6 +312,15 @@ class Terminal(Service): # %% Outbound + def execute(self, command: List[Any]) -> bool: + """Execute a passed ssh command via the request manager.""" + # TODO: Expand as necessary, as new functionalilty is needed. + if command[0] == "install": + self.parent.software_manager.software.install(command[1]) + return True + else: + return False + def _disconnect(self, dest_ip_address: IPv4Address) -> bool: """Disconnect from the remote.""" if not self.is_connected: @@ -266,7 +333,7 @@ class Terminal(Service): software_manager: SoftwareManager = self.software_manager software_manager.send_payload_to_session_manager( - payload={"type": "disconnect", "connection_id": self.remote_connection._connection_uuid}, + payload={"type": "disconnect", "connection_id": self.connection_uuid}, dest_ip_address=dest_ip_address, dest_port=self.port, ) diff --git a/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py b/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py index 65346b45..17af5699 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_services/test_terminal.py @@ -3,12 +3,20 @@ from typing import Tuple import pytest +from primaite.game.agent.interface import ProxyAgent +from primaite.game.game import PrimaiteGame from primaite.simulator.network.container import Network from primaite.simulator.network.hardware.nodes.host.computer import Computer +from primaite.simulator.network.hardware.nodes.host.server import Server +from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router from primaite.simulator.network.hardware.nodes.network.switch import Switch from primaite.simulator.network.protocols.ssh import SSHConnectionMessage, SSHPacket, SSHTransportMessage +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.dns.dns_server import DNSServer from primaite.simulator.system.services.service import ServiceOperatingState from primaite.simulator.system.services.terminal.terminal import Terminal +from primaite.simulator.system.services.web_server.web_server import WebServer from primaite.simulator.system.software import SoftwareHealthState @@ -117,7 +125,7 @@ def test_terminal_ignores_when_off(basic_network): computer_b: Computer = network.get_node_by_hostname("node_b") - terminal_a.login(ip_address="192.168.0.11") # login to computer_b + terminal_a.login(username="admin", password="Admin123!", ip_address="192.168.0.11") # login to computer_b assert terminal_a.is_connected is True @@ -127,6 +135,121 @@ def test_terminal_ignores_when_off(basic_network): payload="Test_Payload", transport_message=SSHTransportMessage.SSH_MSG_SERVICE_REQUEST, connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_DATA, + sender_ip_address=computer_a.network_interface[1].ip_address, + target_ip_address="192.168.0.11", ) assert not terminal_a.send(payload=payload, dest_ip_address="192.168.0.11") + + +def test_terminal_acknowledges_acl_rules(basic_network): + """Test that Terminal messages""" + + network: Network = basic_network + computer_a: Computer = network.get_node_by_hostname("node_a") + terminal_a: Terminal = computer_a.software_manager.software.get("Terminal") + + terminal_a.login(username="admin", password="Admin123!", ip_address="192.168.0.11") + + router = Router(hostname="router", num_ports=3, start_up_duration=0) + router.power_on() + router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0") + router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0") + + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.SSH, dst_port=Port.SSH, position=22) + + +def test_network_simulation(basic_network): + # 0: Pull out the network + network = basic_network + + # 1: Set up network hardware + # 1.1: Configure the router + router = Router(hostname="router", num_ports=3, start_up_duration=0) + router.power_on() + router.configure_port(port=1, ip_address="10.0.1.1", subnet_mask="255.255.255.0") + router.configure_port(port=2, ip_address="10.0.2.1", subnet_mask="255.255.255.0") + + # 1.2: Create and connect switches + switch_1 = Switch(hostname="switch_1", num_ports=6, start_up_duration=0) + switch_1.power_on() + network.connect(endpoint_a=router.network_interface[1], endpoint_b=switch_1.network_interface[6]) + router.enable_port(1) + switch_2 = Switch(hostname="switch_2", num_ports=6, start_up_duration=0) + switch_2.power_on() + network.connect(endpoint_a=router.network_interface[2], endpoint_b=switch_2.network_interface[6]) + router.enable_port(2) + + # 1.3: Create and connect computer + client_1 = Computer( + hostname="client_1", + ip_address="10.0.1.2", + subnet_mask="255.255.255.0", + default_gateway="10.0.1.1", + start_up_duration=0, + ) + client_1.power_on() + network.connect( + endpoint_a=client_1.network_interface[1], + endpoint_b=switch_1.network_interface[1], + ) + + # 1.4: Create and connect servers + server_1 = Server( + hostname="server_1", + ip_address="10.0.2.2", + subnet_mask="255.255.255.0", + default_gateway="10.0.2.1", + start_up_duration=0, + ) + server_1.power_on() + network.connect(endpoint_a=server_1.network_interface[1], endpoint_b=switch_2.network_interface[1]) + + server_2 = Server( + hostname="server_2", + ip_address="10.0.2.3", + subnet_mask="255.255.255.0", + default_gateway="10.0.2.1", + start_up_duration=0, + ) + server_2.power_on() + network.connect(endpoint_a=server_2.network_interface[1], endpoint_b=switch_2.network_interface[2]) + + # 2: Configure base ACL + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.ARP, dst_port=Port.ARP, position=22) + router.acl.add_rule(action=ACLAction.DENY, protocol=IPProtocol.ICMP, position=23) + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.DNS, dst_port=Port.DNS, position=1) + router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=3) + + # 3: Install server software + server_1.software_manager.install(DNSServer) + dns_service: DNSServer = server_1.software_manager.software.get("DNSServer") # noqa + dns_service.dns_register("www.example.com", server_2.network_interface[1].ip_address) + server_2.software_manager.install(WebServer) + + # 3.1: Ensure that the dns clients are configured correctly + client_1.software_manager.software.get("DNSClient").dns_server = server_1.network_interface[1].ip_address + server_2.software_manager.software.get("DNSClient").dns_server = server_1.network_interface[1].ip_address + + terminal_1: Terminal = client_1.software_manager.software.get("Terminal") + + assert terminal_1.login(username="admin", password="Admin123!", ip_address="192.168.0.11") is False + + +def test_terminal_receives_requests(game_and_agent_fixture: Tuple[PrimaiteGame, ProxyAgent]): + game, agent = game_and_agent_fixture + + network: Network = game.simulation.network + computer_a: Computer = network.get_node_by_hostname("node_a") + terminal_a: Terminal = computer_a.software_manager.software.get("Terminal") + + computer_b: Computer = network.get_node_by_hostname("node_b") + + assert terminal_a.is_connected is False + + action = ("TERMINAL_LOGIN", {"username": "admin", "password": "Admin123!"}) # TODO: Add Action to ActionManager ? + + agent.store_action(action) + game.step() + + assert terminal_a.is_connected is True