Files
PrimAITE/src/primaite/simulator/system/services/ntp/ntp_server.py
2025-01-03 13:39:58 +00:00

77 lines
2.5 KiB
Python

# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
from datetime import datetime
from typing import Dict, Optional
from pydantic import Field
from primaite import getLogger
from primaite.simulator.network.protocols.ntp import NTPPacket
from primaite.simulator.system.services.service import Service
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)
class NTPServer(Service, identifier="NTPServer"):
"""Represents a NTP server as a service."""
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for NTPServer."""
type: str = "NTPServer"
config: "NTPServer.ConfigSchema" = Field(default_factory=lambda: NTPServer.ConfigSchema())
def __init__(self, **kwargs):
kwargs["name"] = "NTPServer"
kwargs["port"] = PORT_LOOKUP["NTP"]
kwargs["protocol"] = PROTOCOL_LOOKUP["UDP"]
super().__init__(**kwargs)
self.start()
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current
state of the software.
:rtype: Dict
"""
state = super().describe_state()
return state
def receive(
self,
payload: NTPPacket,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Receives a request from NTPClient.
Check that request has a valid IP address.
:param payload: The payload to send.
:param session_id: Id of the session (Optional).
:return: True if valid NTP request else False.
"""
if not (isinstance(payload, NTPPacket)):
self.sys_log.warning(f"{self.name}: Payload is not a NTPPacket")
self.sys_log.debug(f"{self.name}: {payload}")
return False
payload: NTPPacket = payload
# generate a reply with the current time
time = datetime.now()
payload = payload.generate_reply(time)
# send reply
self.software_manager.session_manager.receive_payload_from_software_manager(
payload=payload, src_port=self.port, dst_port=self.port, ip_protocol=self.protocol, session_id=session_id
)
return True