#2646 - Adding back the dns client/server python that was deleted

This commit is contained in:
Charlie Crane
2024-06-13 12:13:16 +01:00
parent 81bcf99855
commit 6762fd805e
2 changed files with 286 additions and 0 deletions

View File

@@ -1 +1,162 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.dns import DNSPacket, DNSRequest
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
class DNSClient(Service):
"""Represents a DNS Client as a Service."""
dns_cache: Dict[str, IPv4Address] = {}
"A dict of known mappings between domain/URLs names and IPv4 addresses."
dns_server: Optional[IPv4Address] = None
"The DNS Server the client sends requests to."
def __init__(self, **kwargs):
kwargs["name"] = "DNSClient"
kwargs["port"] = Port.DNS
# DNS uses UDP by default
# it switches to TCP when the bytes exceed 512 (or 4096) bytes
# TCP for now
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
self.start()
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current state of the software.
:rtype: Dict
"""
state = super().describe_state()
return state
def add_domain_to_cache(self, domain_name: str, ip_address: IPv4Address) -> bool:
"""
Adds a domain name to the DNS Client cache.
:param: domain_name: The domain name to save to cache
:param: ip_address: The IP Address to attach the domain name to
"""
if not self._can_perform_action():
return False
self.dns_cache[domain_name] = ip_address
return True
def check_domain_exists(
self,
target_domain: str,
session_id: Optional[str] = None,
is_reattempt: bool = False,
) -> bool:
"""Function to check if domain name exists.
:param: target_domain: The domain requested for an IP address.
:param: session_id: The Session ID the payload is to originate from. Optional.
:param: is_reattempt: Checks if the request has been reattempted. Default is False.
"""
if not self._can_perform_action():
return False
# check if DNS server is configured
if self.dns_server is None:
self.sys_log.warning(f"{self.name}: DNS Server is not configured")
return False
# check if the target domain is in the client's DNS cache
payload = DNSPacket(dns_request=DNSRequest(domain_name_request=target_domain))
# check if the domain is already in the DNS cache
if target_domain in self.dns_cache:
self.sys_log.info(
f"{self.name}: Domain lookup for {target_domain} successful,"
f"resolves to {self.dns_cache[target_domain]}"
)
return True
else:
# return False if already reattempted
if is_reattempt:
self.sys_log.warning(f"{self.name}: Domain lookup for {target_domain} failed")
return False
else:
# send a request to check if domain name exists in the DNS Server
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(
payload=payload, dest_ip_address=self.dns_server, dest_port=Port.DNS
)
# recursively re-call the function passing is_reattempt=True
return self.check_domain_exists(
target_domain=target_domain,
session_id=session_id,
is_reattempt=True,
)
def send(
self,
payload: DNSPacket,
session_id: Optional[str] = None,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
**kwargs,
) -> bool:
"""
Sends a payload to the SessionManager.
:param payload: The payload to be sent.
:param dest_ip_address: The ip address of the payload destination.
:param dest_port: The port of the payload destination.
:param session_id: The Session ID the payload is to originate from. Optional.
:return: True if successful, False otherwise.
"""
self.sys_log.info(f"{self.name}: Sending DNS request to resolve {payload.dns_request.domain_name_request}")
return super().send(
payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id, **kwargs
)
def receive(
self,
payload: DNSPacket,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Receives a payload from the SessionManager.
:param payload: The payload to be sent.
:param session_id: The Session ID the payload is to originate from. Optional.
:return: True if successful, False otherwise.
"""
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
self.sys_log.warning(f"{self.name}: Payload is not a DNSPacket")
self.sys_log.debug(f"{self.name}: {payload}")
return False
if payload.dns_reply is not None:
# add the IP address to the client cache
if payload.dns_reply.domain_name_ip_address:
self.sys_log.info(
f"{self.name}: Resolved domain name {payload.dns_request.domain_name_request} "
f"to {payload.dns_reply.domain_name_ip_address}"
)
self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address
return True
self.sys_log.warning(f"Failed to resolve domain name {payload.dns_request.domain_name_request}")
return False

View File

@@ -1 +1,126 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.network.protocols.dns import DNSPacket
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
class DNSServer(Service):
"""Represents a DNS Server as a Service."""
dns_table: Dict[str, IPv4Address] = {}
"A dict of mappings between domain names and IPv4 addresses."
def __init__(self, **kwargs):
kwargs["name"] = "DNSServer"
kwargs["port"] = Port.DNS
# DNS uses UDP by default
# it switches to TCP when the bytes exceed 512 (or 4096) bytes
# TCP for now
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
self.start()
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
:return: A dictionary containing key-value pairs representing the current state of the software.
:rtype: Dict
"""
state = super().describe_state()
return state
def dns_lookup(self, target_domain: str) -> Optional[IPv4Address]:
"""
Attempts to find the IP address for a domain name.
:param target_domain: The single domain name requested by a DNS client.
:return ip_address: The IP address of that domain name or None.
"""
if not self._can_perform_action():
return
return self.dns_table.get(target_domain)
def dns_register(self, domain_name: str, domain_ip_address: IPv4Address):
"""
Register a domain name and its IP address.
:param: domain_name: The domain name to register
:type: domain_name: str
:param: domain_ip_address: The IP address that the domain should route to
:type: domain_ip_address: IPv4Address
"""
if not self._can_perform_action():
return
self.dns_table[domain_name] = domain_ip_address
def receive(
self,
payload: Any,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param: payload: The payload to send.
:param: session_id: The id of the session. Optional.
:return: True if DNS request returns a valid IP, otherwise, False
"""
if not super().receive(payload=payload, session_id=session_id, **kwargs):
return False
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
self.sys_log.warning(f"{payload} is not a DNSPacket")
self.sys_log.debug(f"{payload} is not a DNSPacket")
return False
# cast payload into a DNS packet
payload: DNSPacket = payload
if payload.dns_request is not None:
self.sys_log.info(
f"{self.name}: Received domain lookup request for {payload.dns_request.domain_name_request} "
f"from session {session_id}"
)
# generate a reply with the correct DNS IP address
payload = payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request))
self.sys_log.info(
f"{self.name}: Responding to domain lookup request for {payload.dns_request.domain_name_request} "
f"with ip address: {payload.dns_reply.domain_name_ip_address}"
)
# send reply
self.send(payload, session_id)
return payload.dns_reply.domain_name_ip_address is not None
return False
def show(self, markdown: bool = False):
"""Prints a table of DNS Lookup table."""
table = PrettyTable(["Domain Name", "IP Address"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} DNS Lookup table"
for dns in self.dns_table.items():
table.add_row([dns[0], dns[1]])
print(table)