#2042: Test tidying changes.

This commit is contained in:
Nick Todd
2023-11-23 15:55:58 +00:00
parent 8584fa8f51
commit 87dde6ee0b
2 changed files with 26 additions and 9 deletions

View File

@@ -15,6 +15,7 @@ class NTPClient(Service):
"""Represents a NTP client as a service."""
ip_addr: Optional[IPv4Address] = None
"The IP address of the NTP client"
ntp_server: Optional[IPv4Address] = None
"The NTP server the client sends requests to."
time: Optional[datetime] = None
@@ -26,6 +27,17 @@ class NTPClient(Service):
super().__init__(**kwargs)
self.start()
def configure(self, ntp_server_ip_address: IPv4Address, ntp_client_ip_address: IPv4Address) -> None:
"""
Set the IP address for the NTP server.
:param ntp_server_ip_address: IPv4 address of NTP server.
:param ntp_client_ip_Address: IPv4 address of NTP client.
"""
self.ip_addr = ntp_client_ip_address
self.ntp_server = ntp_server_ip_address
self.sys_log.info(f"{self.name}: ip_addr: {self.ip_addr}, ntp_server: {self.ntp_server}")
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
@@ -100,9 +112,9 @@ class NTPClient(Service):
self.time = payload.ntp_reply.ntp_datetime
return True
def request_time(self, ip_address: IPv4Address = ip_addr) -> None:
def request_time(self) -> None:
"""Send request to ntp_server."""
ntp_request = NTPRequest(ntp_client=ip_address)
ntp_request = NTPRequest(ntp_client=self.ip_addr)
ntp_server_packet = NTPPacket(ntp_request=ntp_request)
self.send(payload=ntp_server_packet)

View File

@@ -36,6 +36,7 @@ def create_ntp_network() -> Network:
)
ntp_client.power_on()
ntp_client.software_manager.install(NTPClient)
network.connect(endpoint_b=ntp_server.ethernet_port[1], endpoint_a=ntp_client.ethernet_port[1])
return network
@@ -54,21 +55,25 @@ def test_ntp_client_server():
assert ntp_server.operating_state == ServiceOperatingState.RUNNING
assert ntp_client.operating_state == ServiceOperatingState.RUNNING
ntp_client.configure(
ntp_server_ip_address=IPv4Address("192.168.1.2"), ntp_client_ip_address=IPv4Address("192.168.1.3")
)
assert ntp_client.time is None
ntp_request = NTPRequest(ntp_client="192.168.1.3")
ntp_packet = NTPPacket(ntp_request=ntp_request)
# ntp_request = NTPRequest(ntp_client="192.168.1.3")
# ntp_packet = NTPPacket(ntp_request=ntp_request)
# ntp_client.send(payload=ntp_packet)
ntp_client.request_time("192.168.1.3")
ntp_client.request_time()
assert ntp_server.receive(payload=ntp_packet) is True
assert ntp_client.receive(payload=ntp_packet) is True
# assert ntp_server.receive(payload=ntp_packet) is True
# assert ntp_client.receive(payload=ntp_packet) is True
assert ntp_client.time is not None
first_time = ntp_client.time
sleep(0.1)
ntp_client.apply_timestep(1) # Check time advances
ntp_server.receive(payload=ntp_packet)
ntp_client.receive(payload=ntp_packet)
# ntp_server.receive(payload=ntp_packet)
# ntp_client.receive(payload=ntp_packet)
second_time = ntp_client.time
assert first_time != second_time