#1752: Added send+receive functionality for DNS client and server + tests + added simulation_output to gitignore

This commit is contained in:
Czar Echavez
2023-09-12 08:46:07 +01:00
parent ee730d4ab0
commit 1a81285b76
7 changed files with 129 additions and 48 deletions

2
.gitignore vendored
View File

@@ -144,9 +144,11 @@ cython_debug/
# IDE
.idea/
docs/source/primaite-dependencies.rst
.vscode/
# outputs
src/primaite/outputs/
simulation_output/
# benchmark session outputs
benchmark/output

View File

@@ -10,6 +10,8 @@ from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.services.database_service import DatabaseService
from primaite.simulator.system.services.dns_client import DNSClient
from primaite.simulator.system.services.dns_server import DNSServer
from primaite.simulator.system.services.red_services.data_manipulation_bot import DataManipulationBot
@@ -129,6 +131,7 @@ def arcd_uc2_network() -> Network:
hostname="client_1", ip_address="192.168.10.21", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
)
client_1.power_on()
client_1.software_manager.install(DNSClient)
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
client_1.software_manager.install(DataManipulationBot)
db_manipulation_bot: DataManipulationBot = client_1.software_manager.software["DataManipulationBot"]
@@ -139,6 +142,7 @@ def arcd_uc2_network() -> Network:
hostname="client_2", ip_address="192.168.10.22", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
)
client_2.power_on()
client_2.software_manager.install(DNSClient)
network.connect(endpoint_b=client_2.ethernet_port[1], endpoint_a=switch_2.switch_ports[2])
# Domain Controller
@@ -149,6 +153,8 @@ def arcd_uc2_network() -> Network:
default_gateway="192.168.1.1",
)
domain_controller.power_on()
domain_controller.software_manager.install(DNSServer)
network.connect(endpoint_b=domain_controller.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
# Database Server
@@ -200,12 +206,17 @@ def arcd_uc2_network() -> Network:
)
web_server.power_on()
web_server.software_manager.install(DatabaseClient)
database_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"]
database_client.configure(server_ip_address=IPv4Address("192.168.1.14"))
network.connect(endpoint_b=web_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[2])
database_client.run()
database_client.connect()
# register the web_server to a domain
dns_server_service: DNSServer = domain_controller.software_manager.software["DNSServer"] # noqa
dns_server_service.dns_register("arcd.com", web_server.ip_address)
# Backup Server
backup_server = Server(
hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
@@ -229,6 +240,10 @@ def arcd_uc2_network() -> Network:
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
# Allow PostgreSQL requests
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER)
# Allow DNS requests
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.DNS, dst_port=Port.DNS)
return network

View File

@@ -56,7 +56,7 @@ class DNSPacket(BaseModel):
:param domain_ip_address: The IP address that was being sought after from the original target domain name.
:return: A new instance of DNSPacket.
"""
return DNSPacket(
dns_request=DNSRequest(domain_name_request=self.dns_request.domain_name_request),
dns_reply=DNSReply(domain_name_ip_address=domain_ip_address),
)
if domain_ip_address is not None:
self.dns_reply = DNSReply(domain_name_ip_address=IPv4Address(domain_ip_address))
return self

View File

@@ -1,11 +1,15 @@
from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.dns import DNSPacket, DNSRequest
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
class DNSClient(Service):
"""Represents a DNS Client as a Service."""
@@ -52,15 +56,15 @@ class DNSClient(Service):
"""
self.dns_cache[domain_name] = ip_address
def check_domain_in_cache(
def check_domain_exists(
self,
target_domain: str,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
dest_port: Optional[Port] = Port.DNS,
session_id: Optional[str] = None,
is_reattempt: bool = False,
) -> bool:
"""Function to check if domain name is in DNS client cache.
"""Function to check if domain name exists.
:param: target_domain: The domain requested for an IP address.
:param: dest_ip_address: The ip address of the payload destination.
@@ -80,21 +84,26 @@ class DNSClient(Service):
return False
else:
# send a request to check if domain name exists in the DNS Server
self.send(payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id)
# call function again
return self.check_domain_in_cache(
target_domain=target_domain,
self.software_manager.send_payload_to_session_manager(
payload=payload,
dest_ip_address=dest_ip_address,
dest_port=dest_port,
session_id=session_id,
is_reattempt=True,
)
# check if the domain has been added to cache
if self.dns_cache.get(target_domain) is None:
# call function again
return self.check_domain_exists(
target_domain=target_domain,
dest_ip_address=dest_ip_address,
dest_port=dest_port,
session_id=session_id,
is_reattempt=True,
)
def send(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
@@ -112,15 +121,12 @@ class DNSClient(Service):
:return: True if successful, False otherwise.
"""
# create DNS request packet
self.software_manager.send_payload_to_session_manager(
payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id
)
software_manager: SoftwareManager = self.software_manager
software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
def receive(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
@@ -131,11 +137,18 @@ class DNSClient(Service):
is generated should be implemented in subclasses.
:param payload: The payload to be sent.
:param dest_ip_address: The ip address of the payload destination.
:param dest_port: The port of the payload destination.
:param session_id: The Session ID the payload is to originate from. Optional.
:return: True if successful, False otherwise.
"""
super().send()
# check the DNS packet (dns request, dns reply) here and see if it actually worked
pass
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
_LOGGER.debug(f"{payload} is not a DNSPacket")
return False
# cast payload into a DNS packet
payload: DNSPacket = payload
if payload.dns_reply is not None:
# add the IP address to the client cache
self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address
return True
return False

View File

@@ -75,8 +75,6 @@ class DNSServer(Service):
def send(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
@@ -87,14 +85,13 @@ class DNSServer(Service):
is generated should be implemented in subclasses.
:param: payload: The payload to send.
:param: dest_ip_address: The ip address of the machine that the payload will be sent to
:param: dest_port: The port of the machine that the payload will be sent to
:param: session_id: The id of the session
:return: True if successful, False otherwise.
"""
try:
self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id)
return True
except Exception as e:
_LOGGER.error(e)
return False
@@ -102,8 +99,6 @@ class DNSServer(Service):
def receive(
self,
payload: Any,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
**kwargs,
) -> bool:
@@ -114,11 +109,9 @@ class DNSServer(Service):
is generated should be implemented in subclasses.
:param: payload: The payload to send.
:param: dest_ip_address: The ip address of the machine that the payload will be sent to
:param: dest_port: The port of the machine that the payload will be sent to
:param: session_id: The id of the session
:return: True if successful, False otherwise.
:return: True if DNS request returns a valid IP, otherwise, False
"""
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
@@ -128,10 +121,10 @@ class DNSServer(Service):
payload: DNSPacket = payload
if payload.dns_request is not None:
# generate a reply with the correct DNS IP address
payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request))
payload = payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request))
# send reply
self.send(payload, session_id)
return True
return payload.dns_reply is not None
return False

View File

@@ -0,0 +1,24 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.system.services.dns_client import DNSClient
from primaite.simulator.system.services.dns_server import DNSServer
def test_dns_client_server(uc2_network):
client_1: Computer = uc2_network.get_node_by_hostname("client_1")
domain_controller: Server = uc2_network.get_node_by_hostname("domain_controller")
dns_client: DNSClient = client_1.software_manager.software["DNSClient"]
dns_server: DNSServer = domain_controller.software_manager.software["DNSServer"]
# register a domain to web server
dns_server.dns_register("real-domain.com", IPv4Address("192.168.1.12"))
dns_server.show()
dns_client.check_domain_exists(target_domain="real-domain.com", dest_ip_address=IPv4Address("192.168.1.14"))
# should register the domain in the client cache
assert dns_client.dns_cache.get("real-domain.com") is not None

View File

@@ -1,10 +1,9 @@
import sys
from ipaddress import IPv4Address
import pytest
from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.networks import arcd_uc2_network
from primaite.simulator.network.protocols.dns import DNSPacket, DNSReply, DNSRequest
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.services.dns_client import DNSClient
@@ -14,22 +13,22 @@ from primaite.simulator.system.services.dns_server import DNSServer
@pytest.fixture(scope="function")
def dns_server() -> Node:
node = Node(hostname="dns_server")
node.software_manager.add_service(service_class=DNSServer)
node.software_manager.services["DNSServer"].start()
node.software_manager.install(software_class=DNSServer)
node.software_manager.software["DNSServer"].start()
return node
@pytest.fixture(scope="function")
def dns_client() -> Node:
node = Node(hostname="dns_client")
node.software_manager.add_service(service_class=DNSClient)
node.software_manager.services["DNSClient"].start()
node.software_manager.install(software_class=DNSClient)
node.software_manager.software["DNSClient"].start()
return node
def test_create_dns_server(dns_server):
assert dns_server is not None
dns_server_service: DNSServer = dns_server.software_manager.services["DNSServer"]
dns_server_service: DNSServer = dns_server.software_manager.software["DNSServer"]
assert dns_server_service.name is "DNSServer"
assert dns_server_service.port is Port.DNS
assert dns_server_service.protocol is IPProtocol.UDP
@@ -37,7 +36,7 @@ def test_create_dns_server(dns_server):
def test_create_dns_client(dns_client):
assert dns_client is not None
dns_client_service: DNSClient = dns_client.software_manager.services["DNSClient"]
dns_client_service: DNSClient = dns_client.software_manager.software["DNSClient"]
assert dns_client_service.name is "DNSClient"
assert dns_client_service.port is Port.DNS
assert dns_client_service.protocol is IPProtocol.UDP
@@ -45,7 +44,7 @@ def test_create_dns_client(dns_client):
def test_dns_server_domain_name_registration(dns_server):
"""Test to check if the domain name registration works."""
dns_server_service: DNSServer = dns_server.software_manager.services["DNSServer"]
dns_server_service: DNSServer = dns_server.software_manager.software["DNSServer"]
# register the web server in the domain controller
dns_server_service.dns_register(domain_name="real-domain.com", domain_ip_address=IPv4Address("192.168.1.12"))
@@ -57,10 +56,45 @@ def test_dns_server_domain_name_registration(dns_server):
def test_dns_client_check_domain_in_cache(dns_client):
"""Test to make sure that the check_domain_in_cache returns the correct values."""
dns_client_service: DNSClient = dns_client.software_manager.services["DNSClient"]
dns_client_service: DNSClient = dns_client.software_manager.software["DNSClient"]
# add a domain to the dns client cache
dns_client_service.add_domain_to_cache("real-domain.com", IPv4Address("192.168.1.12"))
assert dns_client_service.check_domain_in_cache("fake-domain.com") is False
assert dns_client_service.check_domain_in_cache("real-domain.com") is True
assert dns_client_service.check_domain_exists("fake-domain.com") is False
assert dns_client_service.check_domain_exists("real-domain.com") is True
def test_dns_server_receive(dns_server):
"""Test to make sure that the DNS Server correctly responds to a DNS Client request."""
dns_server_service: DNSServer = dns_server.software_manager.software["DNSServer"]
# register the web server in the domain controller
dns_server_service.dns_register(domain_name="real-domain.com", domain_ip_address=IPv4Address("192.168.1.12"))
assert (
dns_server_service.receive(payload=DNSPacket(dns_request=DNSRequest(domain_name_request="fake-domain.com")))
is False
)
assert (
dns_server_service.receive(payload=DNSPacket(dns_request=DNSRequest(domain_name_request="real-domain.com")))
is True
)
dns_server_service.show()
def test_dns_client_receive(dns_client):
"""Test to make sure the DNS Client knows how to deal with request responses."""
dns_client_service: DNSClient = dns_client.software_manager.software["DNSClient"]
dns_client_service.receive(
payload=DNSPacket(
dns_request=DNSRequest(domain_name_request="real-domain.com"),
dns_reply=DNSReply(domain_name_ip_address=IPv4Address("192.168.1.12")),
)
)
# domain name should be saved to cache
assert dns_client_service.dns_cache["real-domain.com"] == IPv4Address("192.168.1.12")