#1752: added more functionality to DNS client and server + tests

This commit is contained in:
Czar Echavez
2023-09-07 15:45:37 +01:00
parent 2cb0c238c9
commit 47dd23311b
7 changed files with 321 additions and 123 deletions

View File

@@ -1,19 +1,26 @@
from abc import abstractmethod
from ipaddress import IPv4Address
from typing import Any, Dict, List
from pydantic import BaseModel
from typing import Any, Dict, Optional
from primaite.simulator.network.protocols.dns import DNSPacket, DNSRequest
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
class DNSClient(BaseModel):
class DNSClient(Service):
"""Represents a DNS Client as a Service."""
dns_cache: Dict[str:IPv4Address] = {}
dns_cache: Dict[str, IPv4Address] = {}
"A dict of known mappings between domain/URLs names and IPv4 addresses."
@abstractmethod
def __init__(self, **kwargs):
kwargs["name"] = "DNSClient"
kwargs["port"] = Port.DNS
# DNS uses UDP by default
# it switches to TCP when the bytes exceed 512 (or 4096) bytes
kwargs["protocol"] = IPProtocol.UDP
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
@@ -26,57 +33,109 @@ class DNSClient(BaseModel):
"""
return {"Operating State": self.operating_state}
def apply_action(self, action: List[str]) -> None:
"""
Applies a list of actions to the Service.
:param action: A list of actions to apply.
"""
pass
def reset_component_for_episode(self):
def reset_component_for_episode(self, episode: int):
"""
Resets the Service component for a new episode.
This method ensures the Service is ready for a new episode, including resetting any
stateful properties or statistics, and clearing any message queues.
"""
super().reset_component_for_episode(episode=episode)
self.dns_cache = {}
def check_domain_in_cache(self, target_domain: str, session_id: str):
def add_domain_to_cache(self, domain_name: str, ip_address: IPv4Address):
"""
Adds a domain name to the DNS Client cache.
:param: domain_name: The domain name to save to cache
:param: ip_address: The IP Address to attach the domain name to
"""
self.dns_cache[domain_name] = ip_address
def check_domain_in_cache(
self,
target_domain: str,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
is_reattempt: bool = False,
) -> bool:
"""Function to check if domain name is in DNS client cache.
:param target_domain: The domain requested for an IP address.
:param session_id: The ID of the session in order to send the response to the DNS server or application.
:param: target_domain: The domain requested for an IP address.
:param: dest_ip_address: The ip address of the payload destination.
:param: dest_port: The port of the payload destination.
:param: session_id: The Session ID the payload is to originate from. Optional.
:param: is_reattempt: Checks if the request has been reattempted. Default is False.
"""
if target_domain in self.dns_cache:
ip_address = self.dns_cache[target_domain]
self.send(ip_address, session_id)
else:
self.send(target_domain, session_id)
# check if the target domain is in the client's DNS cache
payload = DNSPacket(dns_request=DNSRequest(domain_name_request=target_domain))
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
# check if the domain is already in the DNS cache
if target_domain in self.dns_cache:
return True
else:
# return False if already reattempted
if is_reattempt:
return False
else:
# send a request to check if domain name exists in the DNS Server
self.send(payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id)
# call function again
return self.check_domain_in_cache(
target_domain=target_domain,
dest_ip_address=dest_ip_address,
dest_port=dest_port,
session_id=session_id,
is_reattempt=True,
)
def send(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Sends a payload to the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to send.
:param payload: The payload to be sent.
:param dest_ip_address: The ip address of the payload destination.
:param dest_port: The port of the payload destination.
:param session_id: The Session ID the payload is to originate from. Optional.
:return: True if successful, False otherwise.
"""
DNSPacket(dns_request=DNSRequest(domain_name_request=payload), dns_reply=None)
# create DNS request packet
self.software_manager.send_payload_to_session_manager(
payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id
)
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
def receive(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to receive. (receive a DNS packet with dns request and dns reply in, send to web
browser)
:param payload: The payload to be sent.
:param dest_ip_address: The ip address of the payload destination.
:param dest_port: The port of the payload destination.
:param session_id: The Session ID the payload is to originate from. Optional.
:return: True if successful, False otherwise.
"""
super().send()
# check the DNS packet (dns request, dns reply) here and see if it actually worked
pass

View File

@@ -1,19 +1,31 @@
from abc import abstractmethod
from ipaddress import IPv4Address
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from pydantic import BaseModel
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.network.protocols.dns import DNSPacket, DNSReply, DNSRequest
from primaite import getLogger
from primaite.simulator.network.protocols.dns import DNSPacket
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
class DNSServer(BaseModel):
class DNSServer(Service):
"""Represents a DNS Server as a Service."""
dns_table: dict[str:IPv4Address] = {}
dns_table: Dict[str, IPv4Address] = {}
"A dict of mappings between domain names and IPv4 addresses."
@abstractmethod
def __init__(self, **kwargs):
kwargs["name"] = "DNSServer"
kwargs["port"] = Port.DNS
# DNS uses UDP by default
# it switches to TCP when the bytes exceed 512 (or 4096) bytes
kwargs["protocol"] = IPProtocol.UDP
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
@@ -26,15 +38,7 @@ class DNSServer(BaseModel):
"""
return {"Operating State": self.operating_state}
def apply_action(self, action: List[str]) -> None:
"""
Applies a list of actions to the Service.
:param action: A list of actions to apply. (unsure)
"""
pass
def dns_lookup(self, target_domain: str) -> Optional[IPv4Address]:
def dns_lookup(self, target_domain: Any) -> Optional[IPv4Address]:
"""
Attempts to find the IP address for a domain name.
@@ -42,11 +46,23 @@ class DNSServer(BaseModel):
:return ip_address: The IP address of that domain name or None.
"""
if target_domain in self.dns_table:
self.dns_table[target_domain]
return self.dns_table[target_domain]
else:
return None
def reset_component_for_episode(self):
def dns_register(self, domain_name: str, domain_ip_address: IPv4Address):
"""
Register a domain name and its IP address.
:param: domain_name: The domain name to register
:type: domain_name: str
:param: domain_ip_address: The IP address that the domain should route to
:type: domain_ip_address: IPv4Address
"""
self.dns_table[domain_name] = domain_ip_address
def reset_component_for_episode(self, episode: int):
"""
Resets the Service component for a new episode.
@@ -54,36 +70,78 @@ class DNSServer(BaseModel):
stateful properties or statistics, and clearing any message queues.
"""
self.dns_table = {}
super().reset_component_for_episode(episode=episode)
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
def send(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Sends a payload to the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to send.
:param: payload: The payload to send.
:param: dest_ip_address: The ip address of the machine that the payload will be sent to
:param: dest_port: The port of the machine that the payload will be sent to
:param: session_id: The id of the session
:return: True if successful, False otherwise.
"""
# DNS packet will be sent from DNS Server to the DNS client
DNSPacket(
dns_request=DNSRequest(domain_name_request=self.dns_table),
dns_reply=DNSReply(domain_name_ip_address=payload),
)
try:
self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
except Exception as e:
_LOGGER.error(e)
return False
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
def receive(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to receive. (take the domain name and do dns lookup)
:param: payload: The payload to send.
:param: dest_ip_address: The ip address of the machine that the payload will be sent to
:param: dest_port: The port of the machine that the payload will be sent to
:param: session_id: The id of the session
:return: True if successful, False otherwise.
"""
ip_address = self.dns_lookup(payload)
if ip_address is not None:
self.send(ip_address, session_id)
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
_LOGGER.debug(f"{payload} is not a DNSPacket")
return False
# cast payload into a DNS packet
payload: DNSPacket = payload
if payload.dns_request is not None:
# generate a reply with the correct DNS IP address
payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request))
# send reply
self.send(payload, session_id)
return True
return False
def show(self, markdown: bool = False):
"""Prints a table of DNS Lookup table."""
table = PrettyTable(["Domain Name", "IP Address"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.sys_log.hostname} DNS Lookup table"
for dns in self.dns_table.items():
table.add_row([dns[0], dns[1]])
print(table)

View File

@@ -1,8 +1,10 @@
from enum import Enum
from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from primaite import getLogger
from primaite.simulator.core import Action, ActionManager
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.software import IOSoftware
_LOGGER = getLogger(__name__)
@@ -72,29 +74,54 @@ class Service(IOSoftware):
"""
pass
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
def send(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Sends a payload to the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to send.
:param: payload: The payload to send.
:param: dest_ip_address: The ip address of the machine that the payload will be sent to
:param: dest_port: The port of the machine that the payload will be sent to
:param: session_id: The id of the session
:return: True if successful, False otherwise.
"""
pass
self.software_manager.send_payload_to_session_manager(
payload=payload, dest_ip_address=dest_ip_address, dest_port=self.port, session_id=session_id
)
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
def receive(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
Receives a payload from the SessionManager.
The specifics of how the payload is processed and whether a response payload
is generated should be implemented in subclasses.
:param payload: The payload to receive.
:param: payload: The payload to send.
:param: dest_ip_address: The ip address of the machine that the payload will be sent to
:param: dest_port: The port of the machine that the payload will be sent to
:param: session_id: The id of the session
:return: True if successful, False otherwise.
"""
pass
pass
def stop(self) -> None:
"""Stop the service."""