Merged PR 503: Enable Multi-Port Listening for Services and Applications

## Summary
- Added a `listen_on_ports` set in the `IOSoftware` class to enable software listening on ports in addition to the main port they're assigned.
- Also added something I missed in the `CHANGELOG.md` from user login ticket 🙃

## Test process
- Tested listening on ports with a dummy listener software class and counted frames snooped on.
- Also tested that the actual software that the posts being snooped in on still works as expected.

## Checklist
- [X] PR is linked to a **work item**
- [X] **acceptance criteria** of linked ticket are met
- [X] performed **self-review** of the code
- [X] written **tests** for any new functionality added with this PR
- [X] updated the **documentation** if this PR changes or adds functionality
- [X] written/updated **design docs** if this PR implements new functionality
- [X] updated the **change log**
- [X] ran **pre-commit** checks for code style
- [ ] attended to any **TO-DOs** left in the code

Related work items: #2768
This commit is contained in:
Christopher McCarthy
2024-08-09 10:25:33 +00:00
8 changed files with 213 additions and 9 deletions

View File

@@ -0,0 +1,39 @@
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
agent_log_level: INFO
save_agent_logs: true
write_agent_log_to_terminal: True
game:
max_episode_length: 256
ports:
- ARP
protocols:
- ICMP
- UDP
simulation:
network:
nodes:
- hostname: client
type: computer
ip_address: 192.168.10.11
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
services:
- type: DatabaseService
options:
backup_server_ip: 10.10.1.12
listen_on_ports:
- 631
applications:
- type: WebBrowser
options:
target_url: http://sometech.ai
listen_on_ports:
- SMB

View File

@@ -0,0 +1,84 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from typing import Any, Dict, List, Set
import yaml
from pydantic import Field
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.service import Service
from tests import TEST_ASSETS_ROOT
class _DatabaseListener(Service):
name: str = "DatabaseListener"
protocol: IPProtocol = IPProtocol.TCP
port: Port = Port.NONE
listen_on_ports: Set[Port] = {Port.POSTGRES_SERVER}
payloads_received: List[Any] = Field(default_factory=list)
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
self.payloads_received.append(payload)
self.sys_log.info(f"{self.name}: received payload {payload}")
return True
def describe_state(self) -> Dict:
return super().describe_state()
def test_http_listener(client_server):
computer, server = client_server
server.software_manager.install(DatabaseService)
server_db = server.software_manager.software["DatabaseService"]
server_db.start()
server.software_manager.install(_DatabaseListener)
server_db_listener: _DatabaseListener = server.software_manager.software["DatabaseListener"]
server_db_listener.start()
computer.software_manager.install(DatabaseClient)
computer_db_client: DatabaseClient = computer.software_manager.software["DatabaseClient"]
computer_db_client.run()
computer_db_client.server_ip_address = server.network_interface[1].ip_address
assert len(server_db_listener.payloads_received) == 0
computer.session_manager.receive_payload_from_software_manager(
payload="masquerade as Database traffic",
dst_ip_address=server.network_interface[1].ip_address,
dst_port=Port.POSTGRES_SERVER,
ip_protocol=IPProtocol.TCP,
)
assert len(server_db_listener.payloads_received) == 1
db_connection = computer_db_client.get_new_connection()
assert db_connection
assert len(server_db_listener.payloads_received) == 2
assert db_connection.query("SELECT")
assert len(server_db_listener.payloads_received) == 3
def test_set_listen_on_ports_from_config():
config_path = TEST_ASSETS_ROOT / "configs" / "basic_node_with_software_listening_ports.yaml"
with open(config_path, "r") as f:
config_dict = yaml.safe_load(f)
network = PrimaiteGame.from_config(cfg=config_dict).simulation.network
client: Computer = network.get_node_by_hostname("client")
assert Port.SMB in client.software_manager.get_open_ports()
assert Port.IPP in client.software_manager.get_open_ports()
web_browser = client.software_manager.software["WebBrowser"]
assert not web_browser.listen_on_ports.difference({Port.SMB, Port.IPP})