Files
PrimAITE/src/primaite/simulator/system/services/ntp/ntp_server.py

80 lines
2.7 KiB
Python
Raw Normal View History

2023-11-15 10:57:56 +00:00
from datetime import datetime
2023-11-13 17:39:27 +00:00
from typing import Any, Dict, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.ntp import NTPPacket
2023-11-13 17:39:27 +00:00
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
2023-11-13 17:39:27 +00:00
class NTPServer(Service):
2023-11-15 10:57:56 +00:00
"""Represents a NTP server as a service."""
2023-11-13 17:39:27 +00:00
def __init__(self, **kwargs):
kwargs["name"] = "NTPServer"
kwargs["port"] = Port.NTP
kwargs["protocol"] = IPProtocol.UDP
super().__init__(**kwargs)
self.start()
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current
state of the software.
:rtype: Dict
"""
state = super().describe_state()
return state
def reset_component_for_episode(self, episode: int):
"""
Resets the Service component for a new episode.
This method ensures the Service is ready for a new episode, including
resetting any stateful properties or statistics, and clearing any message
queues.
"""
pass
def receive(
2023-11-15 10:57:56 +00:00
self,
payload: Any,
session_id: Optional[str] = None,
**kwargs,
2023-11-13 17:39:27 +00:00
) -> bool:
"""Receives a request from NTPClient.
2023-11-13 17:39:27 +00:00
Check that request has a valid IP address.
:param payload: The payload to send.
:param session_id: Id of the session (Optional).
:return: True if valid NTP request else False.
"""
2023-11-15 10:57:56 +00:00
if not (isinstance(payload, NTPPacket) and payload.ntp_request.ntp_client):
_LOGGER.debug(f"{payload} is not a NTPPacket")
return False
payload: NTPPacket = payload
if payload.ntp_request.ntp_client:
self.sys_log.info(
2023-11-15 10:57:56 +00:00
f"{self.name}: Received request for {payload.ntp_request.ntp_client} " f"from session {session_id}"
)
# generate a reply with the current time
time = datetime.now()
payload = payload.generate_reply(time)
self.sys_log.info(
f"{self.name}: Responding to NTP request for {payload.ntp_request.ntp_client} "
f"with current time: {time}"
)
# send reply
self.send(payload, session_id)
return True