Merged PR 560: #2900 - Terminal last response
## Summary This PR introduces the `last_response` attribute within Terminal, similar to that seen within the C2 suite. The aim of this is to resolve the bug seen when using Terminal, in that when sending remote commands to execute, the RequestResponse does not indicate whether the command was successfully actioned, just that it was sent. ## Test process New test to check that `last_response` contains the correct ReqestResponse following remote command execution ## Checklist - [X] PR is linked to a **work item** - [X] **acceptance criteria** of linked ticket are met - [X] performed **self-review** of the code - [X] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [X] updated the **change log** - [X] ran **pre-commit** checks for code style - [X] attended to any **TO-DOs** left in the code #2900 - Changes to terminal to include a last_response attribute, for use in obtaining RequestResponse from remote command executions Related work items: #2900
This commit is contained in:
@@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
or `generate_seed_value` is set to `true`.
|
||||
- ARP .show() method will now include the port number associated with each entry.
|
||||
- Added `services_requires_scan` and `applications_requires_scan` to agent observation space config to allow the agents to be able to see actual health states of services and applications without requiring scans (Default `True`, set to `False` to allow agents to see actual health state without scanning).
|
||||
- Updated the `Terminal` class to provide response information when sending remote command execution.
|
||||
|
||||
## [3.3.0] - 2024-09-04
|
||||
### Added
|
||||
|
||||
@@ -167,6 +167,22 @@
|
||||
"computer_b.file_system.show()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Information about the latest response when executing a remote command can be seen by calling the `last_response` attribute within `Terminal`"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"print(terminal_a.last_response)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
@@ -488,7 +504,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.10.12"
|
||||
"version": "3.10.11"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
||||
@@ -1711,7 +1711,7 @@ class Node(SimComponent):
|
||||
"""
|
||||
application_name = request[0]
|
||||
if self.software_manager.software.get(application_name):
|
||||
self.sys_log.warning(f"Can't install {application_name}. It's already installed.")
|
||||
self.sys_log.info(f"Can't install {application_name}. It's already installed.")
|
||||
return RequestResponse(status="success", data={"reason": "already installed"})
|
||||
application_class = Application._application_registry[application_name]
|
||||
self.software_manager.install(application_class)
|
||||
|
||||
@@ -135,12 +135,20 @@ class Terminal(Service):
|
||||
_client_connection_requests: Dict[str, Optional[Union[str, TerminalClientConnection]]] = {}
|
||||
"""Dictionary of connect requests made to remote nodes."""
|
||||
|
||||
_last_response: Optional[RequestResponse] = None
|
||||
"""Last response received from RequestManager, for returning remote RequestResponse."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "Terminal"
|
||||
kwargs["port"] = Port.SSH
|
||||
kwargs["protocol"] = IPProtocol.TCP
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@property
|
||||
def last_response(self) -> Optional[RequestResponse]:
|
||||
"""Public version of _last_response attribute."""
|
||||
return self._last_response
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -202,15 +210,11 @@ class Terminal(Service):
|
||||
command: str = request[1]["command"]
|
||||
remote_connection = self._get_connection_from_ip(ip_address=ip_address)
|
||||
if remote_connection:
|
||||
outcome = remote_connection.execute(command)
|
||||
if outcome:
|
||||
return RequestResponse(
|
||||
status="success",
|
||||
data={},
|
||||
)
|
||||
remote_connection.execute(command)
|
||||
return self.last_response if not None else RequestResponse(status="failure", data={})
|
||||
return RequestResponse(
|
||||
status="failure",
|
||||
data={},
|
||||
data={"reason": "Failed to execute command."},
|
||||
)
|
||||
|
||||
rm.add_request(
|
||||
@@ -243,7 +247,8 @@ class Terminal(Service):
|
||||
|
||||
def execute(self, command: List[Any]) -> Optional[RequestResponse]:
|
||||
"""Execute a passed ssh command via the request manager."""
|
||||
return self.parent.apply_request(command)
|
||||
self._last_response = self.parent.apply_request(command)
|
||||
return self._last_response
|
||||
|
||||
def _get_connection_from_ip(self, ip_address: IPv4Address) -> Optional[RemoteTerminalConnection]:
|
||||
"""Find Remote Terminal Connection from a given IP."""
|
||||
@@ -423,10 +428,11 @@ class Terminal(Service):
|
||||
"""
|
||||
source_ip = kwargs["frame"].ip.src_ip_address
|
||||
self.sys_log.info(f"{self.name}: Received payload: {payload}. Source: {source_ip}")
|
||||
self._last_response = None # Clear last response
|
||||
|
||||
if isinstance(payload, SSHPacket):
|
||||
if payload.transport_message == SSHTransportMessage.SSH_MSG_USERAUTH_REQUEST:
|
||||
# validate & add connection
|
||||
# TODO: uncomment this as part of 2781
|
||||
username = payload.user_account.username
|
||||
password = payload.user_account.password
|
||||
connection_id = self.parent.user_session_manager.remote_login(
|
||||
@@ -472,6 +478,9 @@ class Terminal(Service):
|
||||
session_id=session_id,
|
||||
source_ip=source_ip,
|
||||
)
|
||||
self._last_response: RequestResponse = RequestResponse(
|
||||
status="success", data={"reason": "Login Successful"}
|
||||
)
|
||||
|
||||
elif payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_REQUEST:
|
||||
# Requesting a command to be executed
|
||||
@@ -483,12 +492,32 @@ class Terminal(Service):
|
||||
payload.connection_uuid
|
||||
)
|
||||
remote_session.last_active_step = self.software_manager.node.user_session_manager.current_timestep
|
||||
self.execute(command)
|
||||
self._last_response: RequestResponse = self.execute(command)
|
||||
|
||||
if self._last_response.status == "success":
|
||||
transport_message = SSHTransportMessage.SSH_MSG_SERVICE_SUCCESS
|
||||
else:
|
||||
transport_message = SSHTransportMessage.SSH_MSG_SERVICE_FAILED
|
||||
|
||||
payload: SSHPacket = SSHPacket(
|
||||
payload=self._last_response,
|
||||
transport_message=transport_message,
|
||||
connection_message=SSHConnectionMessage.SSH_MSG_CHANNEL_DATA,
|
||||
)
|
||||
self.software_manager.send_payload_to_session_manager(
|
||||
payload=payload, dest_port=self.port, session_id=session_id
|
||||
)
|
||||
return True
|
||||
else:
|
||||
self.sys_log.error(
|
||||
f"{self.name}: Connection UUID:{payload.connection_uuid} is not valid. Rejecting Command."
|
||||
)
|
||||
elif (
|
||||
payload.transport_message == SSHTransportMessage.SSH_MSG_SERVICE_SUCCESS
|
||||
or SSHTransportMessage.SSH_MSG_SERVICE_FAILED
|
||||
):
|
||||
# Likely receiving command ack from remote.
|
||||
self._last_response = payload.payload
|
||||
|
||||
if isinstance(payload, dict) and payload.get("type"):
|
||||
if payload["type"] == "disconnect":
|
||||
|
||||
@@ -71,7 +71,7 @@ def test_ftp_should_not_process_commands_if_service_not_running(ftp_client):
|
||||
assert ftp_client_service._process_ftp_command(payload=payload).status_code is FTPStatusCode.ERROR
|
||||
|
||||
|
||||
def test_ftp_tries_to_senf_file__that_does_not_exist(ftp_client):
|
||||
def test_ftp_tries_to_send_file__that_does_not_exist(ftp_client):
|
||||
"""Method send_file should return false if no file to send."""
|
||||
assert ftp_client.file_system.get_file(folder_name="root", file_name="test.txt") is None
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
|
||||
from primaite.game.agent.interface import ProxyAgent
|
||||
from primaite.game.game import PrimaiteGame
|
||||
from primaite.interface.request import RequestResponse
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
@@ -403,3 +404,59 @@ def test_terminal_connection_timeout(basic_network):
|
||||
assert len(computer_b.user_session_manager.remote_sessions) == 0
|
||||
|
||||
assert not remote_connection.is_active
|
||||
|
||||
|
||||
def test_terminal_last_response_updates(basic_network):
|
||||
"""Test that the _last_response within Terminal correctly updates."""
|
||||
network: Network = basic_network
|
||||
computer_a: Computer = network.get_node_by_hostname("node_a")
|
||||
terminal_a: Terminal = computer_a.software_manager.software.get("Terminal")
|
||||
computer_b: Computer = network.get_node_by_hostname("node_b")
|
||||
|
||||
assert terminal_a.last_response is None
|
||||
|
||||
remote_connection = terminal_a.login(username="admin", password="admin", ip_address="192.168.0.11")
|
||||
|
||||
# Last response should be a successful logon
|
||||
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "Login Successful"})
|
||||
|
||||
remote_connection.execute(command=["software_manager", "application", "install", "RansomwareScript"])
|
||||
|
||||
# Last response should now update following successful install
|
||||
assert terminal_a.last_response == RequestResponse(status="success", data={})
|
||||
|
||||
remote_connection.execute(command=["software_manager", "application", "install", "RansomwareScript"])
|
||||
|
||||
# Last response should now update to success, but with supplied reason.
|
||||
assert terminal_a.last_response == RequestResponse(status="success", data={"reason": "already installed"})
|
||||
|
||||
remote_connection.execute(command=["file_system", "create", "file", "folder123", "doggo.pdf", False])
|
||||
|
||||
# Check file was created.
|
||||
assert computer_b.file_system.access_file(folder_name="folder123", file_name="doggo.pdf")
|
||||
|
||||
# Last response should be confirmation of file creation.
|
||||
assert terminal_a.last_response == RequestResponse(
|
||||
status="success",
|
||||
data={"file_name": "doggo.pdf", "folder_name": "folder123", "file_type": "PDF", "file_size": 102400},
|
||||
)
|
||||
|
||||
remote_connection.execute(
|
||||
command=[
|
||||
"service",
|
||||
"FTPClient",
|
||||
"send",
|
||||
{
|
||||
"dest_ip_address": "192.168.0.2",
|
||||
"src_folder": "folder123",
|
||||
"src_file_name": "cat.pdf",
|
||||
"dest_folder": "root",
|
||||
"dest_file_name": "cat.pdf",
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
assert terminal_a.last_response == RequestResponse(
|
||||
status="failure",
|
||||
data={"reason": "Unable to locate given file on local file system. Perhaps given options are invalid?"},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user