#1752 - Moved dns_server ip address from the NIC to the Node. Updated the arcd_uc2_network so that clients and servers have a dns server. Added sys_log entries for DNSServer and DNSClient. MAde the DNSServer always rend a reply, but for the resolved IP address to be empty if it cannot be resolved.

This commit is contained in:
Chris McCarthy
2023-09-18 10:25:26 +01:00
parent b1e46b4f9e
commit 939de40f1e
8 changed files with 72 additions and 44 deletions

View File

@@ -3,10 +3,10 @@
© Crown-owned copyright 2023, Defence Science and Technology Laboratory UK
DNS Client Server
======================
=================
DNS Server
----------------
----------
Also known as a DNS Resolver, the ``DNSServer`` provides a DNS Server simulation by extending the base Service class.
Key capabilities
@@ -29,7 +29,7 @@ Implementation
- Extends Service class for integration with ``SoftwareManager``.
DNS Client
---------------
----------
The DNSClient provides a client interface for connecting to the ``DNSServer``.
@@ -45,7 +45,7 @@ Usage
- Install on a Node via the ``SoftwareManager`` to start the database service.
- Service runs on TCP port 53 by default. (TODO: TCP for now, should be UDP in future)
- Execute domain name checks with ``check_domain_exists``, providing a ``DNSServer`` ``IPv4Address``.
- Execute domain name checks with ``check_domain_exists``.
- ``DNSClient`` will automatically add the IP Address of the domain into its cache
Implementation

View File

@@ -5,7 +5,7 @@ import secrets
from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from typing import Any, Dict, Literal, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
@@ -89,8 +89,6 @@ class NIC(SimComponent):
"The Maximum Transmission Unit (MTU) of the NIC in Bytes. Default is 1500 B"
wake_on_lan: bool = False
"Indicates if the NIC supports Wake-on-LAN functionality."
dns_servers: List[IPv4Address] = []
"List of IP addresses of DNS servers used for name resolution."
connected_node: Optional[Node] = None
"The Node to which the NIC is connected."
connected_link: Optional[Link] = None
@@ -882,6 +880,8 @@ class Node(SimComponent):
"The NICs on the node."
ethernet_port: Dict[int, NIC] = {}
"The NICs on the node by port id."
dns_server: Optional[IPv4Address] = None
"List of IP addresses of DNS servers used for name resolution."
accounts: Dict[str, Account] = {}
"All accounts on the node."
@@ -931,6 +931,7 @@ class Node(SimComponent):
sys_log=kwargs.get("sys_log"),
session_manager=kwargs.get("session_manager"),
file_system=kwargs.get("file_system"),
dns_server=kwargs.get("dns_server"),
)
super().__init__(**kwargs)
self.arp.nics = self.nics

View File

@@ -128,7 +128,11 @@ def arcd_uc2_network() -> Network:
# Client 1
client_1 = Computer(
hostname="client_1", ip_address="192.168.10.21", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
hostname="client_1",
ip_address="192.168.10.21",
subnet_mask="255.255.255.0",
default_gateway="192.168.10.1",
dns_server=IPv4Address("192.168.1.10"),
)
client_1.power_on()
client_1.software_manager.install(DNSClient)
@@ -141,7 +145,11 @@ def arcd_uc2_network() -> Network:
# Client 2
client_2 = Computer(
hostname="client_2", ip_address="192.168.10.22", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
hostname="client_2",
ip_address="192.168.10.22",
subnet_mask="255.255.255.0",
default_gateway="192.168.10.1",
dns_server=IPv4Address("192.168.1.10"),
)
client_2.power_on()
client_2.software_manager.install(DNSClient)
@@ -167,6 +175,7 @@ def arcd_uc2_network() -> Network:
ip_address="192.168.1.14",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
dns_server=IPv4Address("192.168.1.10"),
)
database_server.power_on()
network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3])
@@ -206,7 +215,11 @@ def arcd_uc2_network() -> Network:
# Web Server
web_server = Server(
hostname="web_server", ip_address="192.168.1.12", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
hostname="web_server",
ip_address="192.168.1.12",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
dns_server=IPv4Address("192.168.1.10"),
)
web_server.power_on()
web_server.software_manager.install(DatabaseClient)
@@ -224,7 +237,11 @@ def arcd_uc2_network() -> Network:
# Backup Server
backup_server = Server(
hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
hostname="backup_server",
ip_address="192.168.1.16",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
dns_server=IPv4Address("192.168.1.10"),
)
backup_server.power_on()
network.connect(endpoint_b=backup_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[4])
@@ -235,6 +252,7 @@ def arcd_uc2_network() -> Network:
ip_address="192.168.1.110",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1",
dns_server=IPv4Address("192.168.1.10"),
)
security_suite.power_on()
network.connect(endpoint_b=security_suite.ethernet_port[1], endpoint_a=switch_1.switch_ports[7])

View File

@@ -22,7 +22,7 @@ class DNSReply(BaseModel):
:param domain_name_ip_address: IP Address of the Domain Name requested.
"""
domain_name_ip_address: IPv4Address
domain_name_ip_address: Optional[IPv4Address] = None
"IP Address of the Domain Name requested."
@@ -56,7 +56,6 @@ class DNSPacket(BaseModel):
:param domain_ip_address: The IP address that was being sought after from the original target domain name.
:return: A new instance of DNSPacket.
"""
if domain_ip_address is not None:
self.dns_reply = DNSReply(domain_name_ip_address=IPv4Address(domain_ip_address))
self.dns_reply = DNSReply(domain_name_ip_address=domain_ip_address)
return self

View File

@@ -23,7 +23,13 @@ IOSoftwareClass = TypeVar("IOSoftwareClass", bound=IOSoftware)
class SoftwareManager:
"""A class that manages all running Services and Applications on a Node and facilitates their communication."""
def __init__(self, session_manager: "SessionManager", sys_log: SysLog, file_system: FileSystem):
def __init__(
self,
session_manager: "SessionManager",
sys_log: SysLog,
file_system: FileSystem,
dns_server: Optional[IPv4Address],
):
"""
Initialize a new instance of SoftwareManager.
@@ -35,6 +41,7 @@ class SoftwareManager:
self.port_protocol_mapping: Dict[Tuple[Port, IPProtocol], Union[Service, Application]] = {}
self.sys_log: SysLog = sys_log
self.file_system: FileSystem = file_system
self.dns_server: Optional[IPv4Address] = dns_server
def get_open_ports(self) -> List[Port]:
"""
@@ -58,7 +65,9 @@ class SoftwareManager:
if software_class in self._software_class_to_name_map:
self.sys_log.info(f"Cannot install {software_class} as it is already installed")
return
software = software_class(software_manager=self, sys_log=self.sys_log, file_system=self.file_system)
software = software_class(
software_manager=self, sys_log=self.sys_log, file_system=self.file_system, dns_server=self.dns_server
)
if isinstance(software, Application):
software.install()
software.software_manager = self

View File

@@ -16,6 +16,8 @@ class DNSClient(Service):
dns_cache: Dict[str, IPv4Address] = {}
"A dict of known mappings between domain/URLs names and IPv4 addresses."
dns_server: Optional[IPv4Address] = None
"The DNS Server the client sends requests to."
def __init__(self, **kwargs):
kwargs["name"] = "DNSClient"
@@ -60,16 +62,12 @@ class DNSClient(Service):
def check_domain_exists(
self,
target_domain: str,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = Port.DNS,
session_id: Optional[str] = None,
is_reattempt: bool = False,
) -> bool:
"""Function to check if domain name exists.
:param: target_domain: The domain requested for an IP address.
:param: dest_ip_address: The ip address of the DNS Server used for domain lookup.
:param: dest_port: The port on the DNS Server which accepts domain lookup requests. Default is Port.DNS.
:param: session_id: The Session ID the payload is to originate from. Optional.
:param: is_reattempt: Checks if the request has been reattempted. Default is False.
"""
@@ -78,30 +76,28 @@ class DNSClient(Service):
# check if the domain is already in the DNS cache
if target_domain in self.dns_cache:
self.sys_log.info(
f"DNS Client: Domain lookup for {target_domain} successful, resolves to {self.dns_cache[target_domain]}"
)
return True
else:
# return False if already reattempted
if is_reattempt:
self.sys_log.info(f"DNS Client: Domain lookup for {target_domain} failed")
return False
else:
# send a request to check if domain name exists in the DNS Server
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(
payload=payload,
dest_ip_address=dest_ip_address,
dest_port=dest_port,
payload=payload, dest_ip_address=self.dns_server, dest_port=Port.DNS
)
# check if the domain has been added to cache
if self.dns_cache.get(target_domain, None) is None:
# call function again
return self.check_domain_exists(
target_domain=target_domain,
dest_ip_address=dest_ip_address,
dest_port=dest_port,
session_id=session_id,
is_reattempt=True,
)
# recursively re-call the function passing is_reattempt=True
return self.check_domain_exists(
target_domain=target_domain,
session_id=session_id,
is_reattempt=True,
)
def send(
self,
@@ -125,6 +121,7 @@ class DNSClient(Service):
# create DNS request packet
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
return True
def receive(
self,
@@ -150,7 +147,8 @@ class DNSClient(Service):
payload: DNSPacket = payload
if payload.dns_reply is not None:
# add the IP address to the client cache
self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address
return True
if payload.dns_reply.domain_name_ip_address:
self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address
return True
return False

View File

@@ -47,10 +47,7 @@ class DNSServer(Service):
:param target_domain: The single domain name requested by a DNS client.
:return ip_address: The IP address of that domain name or None.
"""
if target_domain in self.dns_table:
return self.dns_table[target_domain]
else:
return None
return self.dns_table.get(target_domain)
def dns_register(self, domain_name: str, domain_ip_address: IPv4Address):
"""
@@ -97,11 +94,19 @@ class DNSServer(Service):
# cast payload into a DNS packet
payload: DNSPacket = payload
if payload.dns_request is not None:
self.sys_log.info(
f"DNS Server: Received domain lookup request for {payload.dns_request.domain_name_request} "
f"from session {session_id}"
)
# generate a reply with the correct DNS IP address
payload = payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request))
self.sys_log.info(
f"DNS Server: Responding to domain lookup request for {payload.dns_request.domain_name_request} "
f"with ip address: {payload.dns_reply.domain_name_ip_address}"
)
# send reply
self.send(payload, session_id)
return payload.dns_reply is not None
return payload.dns_reply.domain_name_ip_address is not None
return False

View File

@@ -20,11 +20,9 @@ def test_dns_client_server(uc2_network):
dns_server.show()
# fake domain should not be added to dns cache
dns_client.check_domain_exists(
target_domain="fake-domain.com", dest_ip_address=IPv4Address(domain_controller.ip_address)
)
assert not dns_client.check_domain_exists(target_domain="fake-domain.com")
assert dns_client.dns_cache.get("fake-domain.com", None) is None
# arcd.com is registered in dns server and should be saved to cache
dns_client.check_domain_exists(target_domain="arcd.com", dest_ip_address=IPv4Address(domain_controller.ip_address))
assert dns_client.check_domain_exists(target_domain="arcd.com")
assert dns_client.dns_cache.get("arcd.com", None) is not None