diff --git a/.gitignore b/.gitignore index ff86b65f..66d528a8 100644 --- a/.gitignore +++ b/.gitignore @@ -144,9 +144,11 @@ cython_debug/ # IDE .idea/ docs/source/primaite-dependencies.rst +.vscode/ # outputs src/primaite/outputs/ +simulation_output/ # benchmark session outputs benchmark/output diff --git a/src/primaite/simulator/network/networks.py b/src/primaite/simulator/network/networks.py index ce1ef338..79af75e4 100644 --- a/src/primaite/simulator/network/networks.py +++ b/src/primaite/simulator/network/networks.py @@ -10,6 +10,8 @@ from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.applications.database_client import DatabaseClient from primaite.simulator.system.services.database_service import DatabaseService +from primaite.simulator.system.services.dns_client import DNSClient +from primaite.simulator.system.services.dns_server import DNSServer from primaite.simulator.system.services.red_services.data_manipulation_bot import DataManipulationBot @@ -129,6 +131,7 @@ def arcd_uc2_network() -> Network: hostname="client_1", ip_address="192.168.10.21", subnet_mask="255.255.255.0", default_gateway="192.168.10.1" ) client_1.power_on() + client_1.software_manager.install(DNSClient) network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1]) client_1.software_manager.install(DataManipulationBot) db_manipulation_bot: DataManipulationBot = client_1.software_manager.software["DataManipulationBot"] @@ -139,6 +142,7 @@ def arcd_uc2_network() -> Network: hostname="client_2", ip_address="192.168.10.22", subnet_mask="255.255.255.0", default_gateway="192.168.10.1" ) client_2.power_on() + client_2.software_manager.install(DNSClient) network.connect(endpoint_b=client_2.ethernet_port[1], endpoint_a=switch_2.switch_ports[2]) # Domain Controller @@ -149,6 +153,8 @@ def arcd_uc2_network() -> Network: default_gateway="192.168.1.1", ) domain_controller.power_on() + domain_controller.software_manager.install(DNSServer) + network.connect(endpoint_b=domain_controller.ethernet_port[1], endpoint_a=switch_1.switch_ports[1]) # Database Server @@ -200,12 +206,17 @@ def arcd_uc2_network() -> Network: ) web_server.power_on() web_server.software_manager.install(DatabaseClient) + database_client: DatabaseClient = web_server.software_manager.software["DatabaseClient"] database_client.configure(server_ip_address=IPv4Address("192.168.1.14")) network.connect(endpoint_b=web_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[2]) database_client.run() database_client.connect() + # register the web_server to a domain + dns_server_service: DNSServer = domain_controller.software_manager.software["DNSServer"] # noqa + dns_server_service.dns_register("arcd.com", web_server.ip_address) + # Backup Server backup_server = Server( hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1" @@ -229,6 +240,10 @@ def arcd_uc2_network() -> Network: router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23) + # Allow PostgreSQL requests router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.POSTGRES_SERVER, dst_port=Port.POSTGRES_SERVER) + # Allow DNS requests + router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.DNS, dst_port=Port.DNS) + return network diff --git a/src/primaite/simulator/network/protocols/dns.py b/src/primaite/simulator/network/protocols/dns.py index 0afa6405..e5602c91 100644 --- a/src/primaite/simulator/network/protocols/dns.py +++ b/src/primaite/simulator/network/protocols/dns.py @@ -56,7 +56,7 @@ class DNSPacket(BaseModel): :param domain_ip_address: The IP address that was being sought after from the original target domain name. :return: A new instance of DNSPacket. """ - return DNSPacket( - dns_request=DNSRequest(domain_name_request=self.dns_request.domain_name_request), - dns_reply=DNSReply(domain_name_ip_address=domain_ip_address), - ) + if domain_ip_address is not None: + self.dns_reply = DNSReply(domain_name_ip_address=IPv4Address(domain_ip_address)) + + return self diff --git a/src/primaite/simulator/system/services/dns_client.py b/src/primaite/simulator/system/services/dns_client.py index 3929065d..db01c05c 100644 --- a/src/primaite/simulator/system/services/dns_client.py +++ b/src/primaite/simulator/system/services/dns_client.py @@ -1,11 +1,15 @@ from ipaddress import IPv4Address from typing import Any, Dict, Optional +from primaite import getLogger from primaite.simulator.network.protocols.dns import DNSPacket, DNSRequest from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port +from primaite.simulator.system.core.software_manager import SoftwareManager from primaite.simulator.system.services.service import Service +_LOGGER = getLogger(__name__) + class DNSClient(Service): """Represents a DNS Client as a Service.""" @@ -52,15 +56,15 @@ class DNSClient(Service): """ self.dns_cache[domain_name] = ip_address - def check_domain_in_cache( + def check_domain_exists( self, target_domain: str, dest_ip_address: Optional[IPv4Address] = None, - dest_port: Optional[Port] = None, + dest_port: Optional[Port] = Port.DNS, session_id: Optional[str] = None, is_reattempt: bool = False, ) -> bool: - """Function to check if domain name is in DNS client cache. + """Function to check if domain name exists. :param: target_domain: The domain requested for an IP address. :param: dest_ip_address: The ip address of the payload destination. @@ -80,21 +84,26 @@ class DNSClient(Service): return False else: # send a request to check if domain name exists in the DNS Server - self.send(payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id) - # call function again - return self.check_domain_in_cache( - target_domain=target_domain, + self.software_manager.send_payload_to_session_manager( + payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, - session_id=session_id, - is_reattempt=True, ) + # check if the domain has been added to cache + if self.dns_cache.get(target_domain) is None: + # call function again + return self.check_domain_exists( + target_domain=target_domain, + dest_ip_address=dest_ip_address, + dest_port=dest_port, + session_id=session_id, + is_reattempt=True, + ) + def send( self, payload: Any, - dest_ip_address: Optional[IPv4Address] = None, - dest_port: Optional[Port] = None, session_id: Optional[str] = None, **kwargs, ) -> bool: @@ -112,15 +121,12 @@ class DNSClient(Service): :return: True if successful, False otherwise. """ # create DNS request packet - self.software_manager.send_payload_to_session_manager( - payload=payload, dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id - ) + software_manager: SoftwareManager = self.software_manager + software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id) def receive( self, payload: Any, - dest_ip_address: Optional[IPv4Address] = None, - dest_port: Optional[Port] = None, session_id: Optional[str] = None, **kwargs, ) -> bool: @@ -131,11 +137,18 @@ class DNSClient(Service): is generated should be implemented in subclasses. :param payload: The payload to be sent. - :param dest_ip_address: The ip address of the payload destination. - :param dest_port: The port of the payload destination. :param session_id: The Session ID the payload is to originate from. Optional. :return: True if successful, False otherwise. """ - super().send() - # check the DNS packet (dns request, dns reply) here and see if it actually worked - pass + # The payload should be a DNS packet + if not isinstance(payload, DNSPacket): + _LOGGER.debug(f"{payload} is not a DNSPacket") + return False + # cast payload into a DNS packet + payload: DNSPacket = payload + if payload.dns_reply is not None: + # add the IP address to the client cache + self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address + return True + + return False diff --git a/src/primaite/simulator/system/services/dns_server.py b/src/primaite/simulator/system/services/dns_server.py index 3dcd89f9..b879d515 100644 --- a/src/primaite/simulator/system/services/dns_server.py +++ b/src/primaite/simulator/system/services/dns_server.py @@ -75,8 +75,6 @@ class DNSServer(Service): def send( self, payload: Any, - dest_ip_address: Optional[IPv4Address] = None, - dest_port: Optional[Port] = None, session_id: Optional[str] = None, **kwargs, ) -> bool: @@ -87,14 +85,13 @@ class DNSServer(Service): is generated should be implemented in subclasses. :param: payload: The payload to send. - :param: dest_ip_address: The ip address of the machine that the payload will be sent to - :param: dest_port: The port of the machine that the payload will be sent to :param: session_id: The id of the session :return: True if successful, False otherwise. """ try: self.software_manager.send_payload_to_session_manager(payload=payload, session_id=session_id) + return True except Exception as e: _LOGGER.error(e) return False @@ -102,8 +99,6 @@ class DNSServer(Service): def receive( self, payload: Any, - dest_ip_address: Optional[IPv4Address] = None, - dest_port: Optional[Port] = None, session_id: Optional[str] = None, **kwargs, ) -> bool: @@ -114,11 +109,9 @@ class DNSServer(Service): is generated should be implemented in subclasses. :param: payload: The payload to send. - :param: dest_ip_address: The ip address of the machine that the payload will be sent to - :param: dest_port: The port of the machine that the payload will be sent to :param: session_id: The id of the session - :return: True if successful, False otherwise. + :return: True if DNS request returns a valid IP, otherwise, False """ # The payload should be a DNS packet if not isinstance(payload, DNSPacket): @@ -128,10 +121,10 @@ class DNSServer(Service): payload: DNSPacket = payload if payload.dns_request is not None: # generate a reply with the correct DNS IP address - payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request)) + payload = payload.generate_reply(self.dns_lookup(payload.dns_request.domain_name_request)) # send reply self.send(payload, session_id) - return True + return payload.dns_reply is not None return False diff --git a/tests/integration_tests/system/test_dns_client_server.py b/tests/integration_tests/system/test_dns_client_server.py new file mode 100644 index 00000000..77fa6017 --- /dev/null +++ b/tests/integration_tests/system/test_dns_client_server.py @@ -0,0 +1,24 @@ +from ipaddress import IPv4Address + +from primaite.simulator.network.hardware.nodes.computer import Computer +from primaite.simulator.network.hardware.nodes.server import Server +from primaite.simulator.system.services.dns_client import DNSClient +from primaite.simulator.system.services.dns_server import DNSServer + + +def test_dns_client_server(uc2_network): + client_1: Computer = uc2_network.get_node_by_hostname("client_1") + domain_controller: Server = uc2_network.get_node_by_hostname("domain_controller") + + dns_client: DNSClient = client_1.software_manager.software["DNSClient"] + dns_server: DNSServer = domain_controller.software_manager.software["DNSServer"] + + # register a domain to web server + dns_server.dns_register("real-domain.com", IPv4Address("192.168.1.12")) + + dns_server.show() + + dns_client.check_domain_exists(target_domain="real-domain.com", dest_ip_address=IPv4Address("192.168.1.14")) + + # should register the domain in the client cache + assert dns_client.dns_cache.get("real-domain.com") is not None diff --git a/tests/unit_tests/_primaite/_simulator/_system/_services/test_dns.py b/tests/unit_tests/_primaite/_simulator/_system/_services/test_dns.py index fdb3426d..943d3265 100644 --- a/tests/unit_tests/_primaite/_simulator/_system/_services/test_dns.py +++ b/tests/unit_tests/_primaite/_simulator/_system/_services/test_dns.py @@ -1,10 +1,9 @@ -import sys from ipaddress import IPv4Address import pytest from primaite.simulator.network.hardware.base import Node -from primaite.simulator.network.networks import arcd_uc2_network +from primaite.simulator.network.protocols.dns import DNSPacket, DNSReply, DNSRequest from primaite.simulator.network.transmission.network_layer import IPProtocol from primaite.simulator.network.transmission.transport_layer import Port from primaite.simulator.system.services.dns_client import DNSClient @@ -14,22 +13,22 @@ from primaite.simulator.system.services.dns_server import DNSServer @pytest.fixture(scope="function") def dns_server() -> Node: node = Node(hostname="dns_server") - node.software_manager.add_service(service_class=DNSServer) - node.software_manager.services["DNSServer"].start() + node.software_manager.install(software_class=DNSServer) + node.software_manager.software["DNSServer"].start() return node @pytest.fixture(scope="function") def dns_client() -> Node: node = Node(hostname="dns_client") - node.software_manager.add_service(service_class=DNSClient) - node.software_manager.services["DNSClient"].start() + node.software_manager.install(software_class=DNSClient) + node.software_manager.software["DNSClient"].start() return node def test_create_dns_server(dns_server): assert dns_server is not None - dns_server_service: DNSServer = dns_server.software_manager.services["DNSServer"] + dns_server_service: DNSServer = dns_server.software_manager.software["DNSServer"] assert dns_server_service.name is "DNSServer" assert dns_server_service.port is Port.DNS assert dns_server_service.protocol is IPProtocol.UDP @@ -37,7 +36,7 @@ def test_create_dns_server(dns_server): def test_create_dns_client(dns_client): assert dns_client is not None - dns_client_service: DNSClient = dns_client.software_manager.services["DNSClient"] + dns_client_service: DNSClient = dns_client.software_manager.software["DNSClient"] assert dns_client_service.name is "DNSClient" assert dns_client_service.port is Port.DNS assert dns_client_service.protocol is IPProtocol.UDP @@ -45,7 +44,7 @@ def test_create_dns_client(dns_client): def test_dns_server_domain_name_registration(dns_server): """Test to check if the domain name registration works.""" - dns_server_service: DNSServer = dns_server.software_manager.services["DNSServer"] + dns_server_service: DNSServer = dns_server.software_manager.software["DNSServer"] # register the web server in the domain controller dns_server_service.dns_register(domain_name="real-domain.com", domain_ip_address=IPv4Address("192.168.1.12")) @@ -57,10 +56,45 @@ def test_dns_server_domain_name_registration(dns_server): def test_dns_client_check_domain_in_cache(dns_client): """Test to make sure that the check_domain_in_cache returns the correct values.""" - dns_client_service: DNSClient = dns_client.software_manager.services["DNSClient"] + dns_client_service: DNSClient = dns_client.software_manager.software["DNSClient"] # add a domain to the dns client cache dns_client_service.add_domain_to_cache("real-domain.com", IPv4Address("192.168.1.12")) - assert dns_client_service.check_domain_in_cache("fake-domain.com") is False - assert dns_client_service.check_domain_in_cache("real-domain.com") is True + assert dns_client_service.check_domain_exists("fake-domain.com") is False + assert dns_client_service.check_domain_exists("real-domain.com") is True + + +def test_dns_server_receive(dns_server): + """Test to make sure that the DNS Server correctly responds to a DNS Client request.""" + dns_server_service: DNSServer = dns_server.software_manager.software["DNSServer"] + + # register the web server in the domain controller + dns_server_service.dns_register(domain_name="real-domain.com", domain_ip_address=IPv4Address("192.168.1.12")) + + assert ( + dns_server_service.receive(payload=DNSPacket(dns_request=DNSRequest(domain_name_request="fake-domain.com"))) + is False + ) + + assert ( + dns_server_service.receive(payload=DNSPacket(dns_request=DNSRequest(domain_name_request="real-domain.com"))) + is True + ) + + dns_server_service.show() + + +def test_dns_client_receive(dns_client): + """Test to make sure the DNS Client knows how to deal with request responses.""" + dns_client_service: DNSClient = dns_client.software_manager.software["DNSClient"] + + dns_client_service.receive( + payload=DNSPacket( + dns_request=DNSRequest(domain_name_request="real-domain.com"), + dns_reply=DNSReply(domain_name_ip_address=IPv4Address("192.168.1.12")), + ) + ) + + # domain name should be saved to cache + assert dns_client_service.dns_cache["real-domain.com"] == IPv4Address("192.168.1.12")