#2768 - Fixed issue causing main port to not be included in list of open ports. documented the configuration of listen_on_ports. added test that tests listen_on_ports configuration from yaml.

This commit is contained in:
Chris McCarthy
2024-08-08 21:20:20 +01:00
parent a5652ae4b2
commit a3a9ca9963
6 changed files with 139 additions and 14 deletions

View File

@@ -1,7 +1,7 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
"""PrimAITE game - Encapsulates the simulation and agents."""
from ipaddress import IPv4Address
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union
import numpy as np
from pydantic import BaseModel, ConfigDict
@@ -44,8 +44,10 @@ from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.software import Software
_LOGGER = getLogger(__name__)
@@ -328,6 +330,21 @@ class PrimaiteGame:
user_manager: UserManager = new_node.software_manager.software["UserManager"] # noqa
for user_cfg in node_cfg["users"]:
user_manager.add_user(**user_cfg, bypass_can_perform_action=True)
def _set_software_listen_on_ports(software: Union[Software, Service], software_cfg: Dict):
"""Set listener ports on software."""
listen_on_ports = []
for port_id in set(software_cfg.get("options", {}).get("listen_on_ports", [])):
print("yes", port_id)
port = None
if isinstance(port_id, int):
port = Port(port_id)
elif isinstance(port_id, str):
port = Port[port_id]
if port:
listen_on_ports.append(port)
software.listen_on_ports = set(listen_on_ports)
if "services" in node_cfg:
for service_cfg in node_cfg["services"]:
new_service = None
@@ -341,6 +358,7 @@ class PrimaiteGame:
if "fix_duration" in service_cfg.get("options", {}):
new_service.fixing_duration = service_cfg["options"]["fix_duration"]
_set_software_listen_on_ports(new_service, service_cfg)
# start the service
new_service.start()
else:
@@ -390,6 +408,8 @@ class PrimaiteGame:
_LOGGER.error(msg)
raise ValueError(msg)
_set_software_listen_on_ports(new_application, application_cfg)
# run the application
new_application.run()

View File

@@ -237,19 +237,24 @@ class SoftwareManager:
self.software.get("NMAP").receive(payload=payload, session_id=session_id)
return
main_receiver = self.port_protocol_mapping.get((port, protocol), None)
listening_receivers = [software for software in self.software.values() if port in software.listen_on_ports]
receivers = [main_receiver] + listening_receivers if main_receiver else listening_receivers
if receivers:
for receiver in receivers:
receiver.receive(
payload=deepcopy(payload),
session_id=session_id,
from_network_interface=from_network_interface,
frame=frame,
)
else:
if main_receiver:
main_receiver.receive(
payload=payload, session_id=session_id, from_network_interface=from_network_interface, frame=frame
)
listening_receivers = [
software
for software in self.software.values()
if port in software.listen_on_ports and software != main_receiver
]
for receiver in listening_receivers:
receiver.receive(
payload=deepcopy(payload),
session_id=session_id,
from_network_interface=from_network_interface,
frame=frame,
)
if not main_receiver and not listening_receivers:
self.sys_log.warning(f"No service or application found for port {port} and protocol {protocol}")
pass
def show(self, markdown: bool = False):
"""