diff --git a/src/primaite/simulator/system/services/dns/dns_client.py b/src/primaite/simulator/system/services/dns/dns_client.py index be6c00e7..d7ba0cd4 100644 --- a/src/primaite/simulator/system/services/dns/dns_client.py +++ b/src/primaite/simulator/system/services/dns/dns_client.py @@ -1 +1,162 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from ipaddress import IPv4Address +from typing import Dict, Optional + +from primaite import getLogger +from primaite.simulator.network.protocols.dns import DNSPacket, DNSRequest +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.core.software_manager import SoftwareManager +from primaite.simulator.system.services.service import Service + +_LOGGER = getLogger(__name__) + + +class DNSClient(Service): + """Represents a DNS Client as a Service.""" + + dns_cache: Dict[str, IPv4Address] = {} + "A dict of known mappings between domain/URLs names and IPv4 addresses." + dns_server: Optional[IPv4Address] = None + "The DNS Server the client sends requests to." + + def __init__(self, **kwargs): + kwargs["name"] = "DNSClient" + kwargs["port"] = Port.DNS + # DNS uses UDP by default + # it switches to TCP when the bytes exceed 512 (or 4096) bytes + # TCP for now + kwargs["protocol"] = IPProtocol.TCP + super().__init__(**kwargs) + self.start() + + def describe_state(self) -> Dict: + """ + Describes the current state of the software. + + The specifics of the software's state, including its health, criticality, + and any other pertinent information, should be implemented in subclasses. + + :return: A dictionary containing key-value pairs representing the current state of the software. + :rtype: Dict + """ + state = super().describe_state() + return state + + def add_domain_to_cache(self, domain_name: str, ip_address: IPv4Address) -> bool: + """ + Adds a domain name to the DNS Client cache. + + :param: domain_name: The domain name to save to cache + :param: ip_address: The IP Address to attach the domain name to + """ + if not self._can_perform_action(): + return False + + self.dns_cache[domain_name] = ip_address + return True + + def check_domain_exists( + self, + target_domain: str, + session_id: Optional[str] = None, + is_reattempt: bool = False, + ) -> bool: + """Function to check if domain name exists. + + :param: target_domain: The domain requested for an IP address. + :param: session_id: The Session ID the payload is to originate from. Optional. + :param: is_reattempt: Checks if the request has been reattempted. Default is False. + """ + if not self._can_perform_action(): + return False + + # check if DNS server is configured + if self.dns_server is None: + self.sys_log.warning(f"{self.name}: DNS Server is not configured") + return False + + # check if the target domain is in the client's DNS cache + payload = DNSPacket(dns_request=DNSRequest(domain_name_request=target_domain)) + + # check if the domain is already in the DNS cache + if target_domain in self.dns_cache: + self.sys_log.info( + f"{self.name}: Domain lookup for {target_domain} successful," + f"resolves to {self.dns_cache[target_domain]}" + ) + return True + else: + # return False if already reattempted + if is_reattempt: + self.sys_log.warning(f"{self.name}: Domain lookup for {target_domain} failed") + return False + else: + # send a request to check if domain name exists in the DNS Server + software_manager: SoftwareManager = self.software_manager + software_manager.send_payload_to_session_manager( + payload=payload, dest_ip_address=self.dns_server, dest_port=Port.DNS + ) + + # recursively re-call the function passing is_reattempt=True + return self.check_domain_exists( + target_domain=target_domain, + session_id=session_id, + is_reattempt=True, + ) + + def send( + self, + payload: DNSPacket, + session_id: Optional[str] = None, + dest_ip_address: Optional[IPv4Address] = None, + dest_port: Optional[Port] = None, + **kwargs, + ) -> bool: + """ + Sends a payload to the SessionManager. + + :param payload: The payload to be sent. + :param dest_ip_address: The ip address of the payload destination. + :param dest_port: The port of the payload destination. + :param session_id: The Session ID the payload is to originate from. Optional. + + :return: True if successful, False otherwise. + """ + self.sys_log.info(f"{self.name}: Sending DNS request to resolve {payload.dns_request.domain_name_request}") + + return super().send( + payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id, **kwargs + ) + + def receive( + self, + payload: DNSPacket, + session_id: Optional[str] = None, + **kwargs, + ) -> bool: + """ + Receives a payload from the SessionManager. + + :param payload: The payload to be sent. + :param session_id: The Session ID the payload is to originate from. Optional. + :return: True if successful, False otherwise. + """ + # The payload should be a DNS packet + if not isinstance(payload, DNSPacket): + self.sys_log.warning(f"{self.name}: Payload is not a DNSPacket") + self.sys_log.debug(f"{self.name}: {payload}") + return False + + if payload.dns_reply is not None: + # add the IP address to the client cache + if payload.dns_reply.domain_name_ip_address: + self.sys_log.info( + f"{self.name}: Resolved domain name {payload.dns_request.domain_name_request} " + f"to {payload.dns_reply.domain_name_ip_address}" + ) + self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address + return True + + self.sys_log.warning(f"Failed to resolve domain name {payload.dns_request.domain_name_request}") + return False diff --git a/src/primaite/simulator/system/services/dns/dns_server.py b/src/primaite/simulator/system/services/dns/dns_server.py index be6c00e7..8a4bbaed 100644 --- a/src/primaite/simulator/system/services/dns/dns_server.py +++ b/src/primaite/simulator/system/services/dns/dns_server.py @@ -1 +1,126 @@ # © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK +from ipaddress import IPv4Address +from typing import Any, Dict, Optional + +from prettytable import MARKDOWN, PrettyTable + +from primaite import getLogger +from primaite.simulator.network.protocols.dns import DNSPacket +from primaite.simulator.network.transmission.network_layer import IPProtocol +from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.services.service import Service + +_LOGGER = getLogger(__name__) + + +class DNSServer(Service): + """Represents a DNS Server as a Service.""" + + dns_table: Dict[str, IPv4Address] = {} + "A dict of mappings between domain names and IPv4 addresses." + + def __init__(self, **kwargs): + kwargs["name"] = "DNSServer" + kwargs["port"] = Port.DNS + # DNS uses UDP by default + # it switches to TCP when the bytes exceed 512 (or 4096) bytes + # TCP for now + kwargs["protocol"] = IPProtocol.TCP + super().__init__(**kwargs) + self.start() + + def describe_state(self) -> Dict: + """ + Describes the current state of the software. + + The specifics of the software's state, including its health, criticality, + and any other pertinent information, should be implemented in subclasses. + + :return: A dictionary containing key-value pairs representing the current state of the software. + :rtype: Dict + """ + state = super().describe_state() + return state + + def dns_lookup(self, target_domain: str) -> Optional[IPv4Address]: + """ + Attempts to find the IP address for a domain name. + + :param target_domain: The single domain name requested by a DNS client. + :return ip_address: The IP address of that domain name or None. + """ + if not self._can_perform_action(): + return + + return self.dns_table.get(target_domain) + + def dns_register(self, domain_name: str, domain_ip_address: IPv4Address): + """ + Register a domain name and its IP address. + + :param: domain_name: The domain name to register + :type: domain_name: str + + :param: domain_ip_address: The IP address that the domain should route to + :type: domain_ip_address: IPv4Address + """ + if not self._can_perform_action(): + return + + self.dns_table[domain_name] = domain_ip_address + + def receive( + self, + payload: Any, + session_id: Optional[str] = None, + **kwargs, + ) -> bool: + """ + Receives a payload from the SessionManager. + + The specifics of how the payload is processed and whether a response payload + is generated should be implemented in subclasses. + + :param: payload: The payload to send. + :param: session_id: The id of the session. Optional. + + :return: True if DNS request returns a valid IP, otherwise, False + """ + if not super().receive(payload=payload, session_id=session_id, **kwargs): + return False + + # The payload should be a DNS packet + if not isinstance(payload, DNSPacket): + self.sys_log.warning(f"{payload} is not a DNSPacket") + self.sys_log.debug(f"{payload} is not a DNSPacket") + return False + + # cast payload into a DNS packet + payload: DNSPacket = payload + if payload.dns_request is not None: + self.sys_log.info( + f"{self.name}: Received domain lookup request for {payload.dns_request.domain_name_request} " + f"from session {session_id}" + ) + # generate a reply with the correct DNS IP address + payload = payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request)) + self.sys_log.info( + f"{self.name}: Responding to domain lookup request for {payload.dns_request.domain_name_request} " + f"with ip address: {payload.dns_reply.domain_name_ip_address}" + ) + # send reply + self.send(payload, session_id) + return payload.dns_reply.domain_name_ip_address is not None + + return False + + def show(self, markdown: bool = False): + """Prints a table of DNS Lookup table.""" + table = PrettyTable(["Domain Name", "IP Address"]) + if markdown: + table.set_style(MARKDOWN) + table.align = "l" + table.title = f"{self.sys_log.hostname} DNS Lookup table" + for dns in self.dns_table.items(): + table.add_row([dns[0], dns[1]]) + print(table)