#2041: Implement NTP protocol for server
This commit is contained in:
@@ -2,22 +2,22 @@ from __future__ import annotations
|
||||
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from primaite.simulator.network.protocols.packet import DataPacket
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class NTPRequest(BaseModel):
|
||||
"""Represents a NTP Request packet."""
|
||||
|
||||
pass
|
||||
ntp_client: IPv4Address = None
|
||||
|
||||
|
||||
class NTPReply(BaseModel):
|
||||
"""Represents a NTP Reply packet."""
|
||||
|
||||
pass
|
||||
ntp_datetime: datetime
|
||||
"NTP datetime object set by NTP Server."
|
||||
|
||||
|
||||
class NTPPacket(DataPacket):
|
||||
@@ -31,4 +31,12 @@ class NTPPacket(DataPacket):
|
||||
ntp_request: NTPRequest
|
||||
"NTP Request packet sent by NTP Client."
|
||||
ntp_reply: Optional[NTPReply] = None
|
||||
"NTP Reply packet generated by NTP Server."
|
||||
|
||||
def generate_reply(self, time: datetime) -> NTPPacket:
|
||||
""" Generate a NTPPacket containing the time in a NTPReply object
|
||||
|
||||
:param time: datetime object representing the time from the NTP server.
|
||||
:return: A new NTPPacket object.
|
||||
"""
|
||||
self.ntp_reply = NTPReply(time)
|
||||
return self
|
||||
|
||||
@@ -2,12 +2,15 @@ from ipaddress import IPv4Address
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.network.protocols.ntp import NTPPacket
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from datetime import datetime
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
class NTPServer(Service):
|
||||
"""Represents a NTP server as a service"""
|
||||
|
||||
@@ -48,9 +51,32 @@ class NTPServer(Service):
|
||||
session_id: Optional[str] = None,
|
||||
**kwargs,
|
||||
) -> bool:
|
||||
"""Receives a request from NTPClient"""
|
||||
pass
|
||||
"""Receives a request from NTPClient.
|
||||
|
||||
def send(self):
|
||||
"""Sends time data to NTPClient"""
|
||||
pass
|
||||
Check that request has a valid IP address.
|
||||
|
||||
:param payload: The payload to send.
|
||||
:param session_id: Id of the session (Optional).
|
||||
|
||||
:return: True if valid NTP request else False.
|
||||
"""
|
||||
if not (isinstance(payload, NTPPacket) and
|
||||
payload.ntp_request.ntp_client):
|
||||
_LOGGER.debug(f"{payload} is not a NTPPacket")
|
||||
return False
|
||||
payload: NTPPacket = payload
|
||||
if payload.ntp_request.ntp_client:
|
||||
self.sys_log.info(
|
||||
f"{self.name}: Received request for {payload.ntp_request.ntp_client} "
|
||||
f"from session {session_id}"
|
||||
)
|
||||
# generate a reply with the current time
|
||||
time = datetime.now()
|
||||
payload = payload.generate_reply(time)
|
||||
self.sys_log.info(
|
||||
f"{self.name}: Responding to NTP request for {payload.ntp_request.ntp_client} "
|
||||
f"with current time: {time}"
|
||||
)
|
||||
# send reply
|
||||
self.send(payload, session_id)
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user