#2041: NTP server initial commits

This commit is contained in:
Nick Todd
2023-11-13 17:39:27 +00:00
parent 815bdfe603
commit 2ee8203397
3 changed files with 90 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from ipaddress import IPv4Address
from typing import Optional
from pydantic import BaseModel
from primaite.simulator.network.protocols.packet import DataPacket
class NTPRequest(BaseModel):
"""Represents a NTP Request packet."""
pass
class NTPReply(BaseModel):
"""Represents a NTP Reply packet."""
pass
class NTPPacket(DataPacket):
"""
Represents the NTP layer of a network frame.
:param ntp_request: NTPRequest packet from NTP client.
:param ntp_reply: NTPReply packet from NTP Server.
"""
ntp_request: NTPRequest
"NTP Request packet sent by NTP Client."
ntp_reply: Optional[NTPReply] = None
"NTP Reply packet generated by NTP Server."

View File

@@ -0,0 +1,56 @@
from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from primaite import getLogger
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
class NTPServer(Service):
"""Represents a NTP server as a service"""
def __init__(self, **kwargs):
kwargs["name"] = "NTPServer"
kwargs["port"] = Port.NTP
kwargs["protocol"] = IPProtocol.UDP
super().__init__(**kwargs)
self.start()
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current
state of the software.
:rtype: Dict
"""
state = super().describe_state()
return state
def reset_component_for_episode(self, episode: int):
"""
Resets the Service component for a new episode.
This method ensures the Service is ready for a new episode, including
resetting any stateful properties or statistics, and clearing any message
queues.
"""
pass
def receive(
self,
payload: Any,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""Receives a request from NTPClient"""
pass
def send(self):
"""Sends time data to NTPClient"""
pass