Merge remote-tracking branch 'devops/dev' into feature/1801-Database

# Conflicts:
#	src/primaite/simulator/network/container.py
#	src/primaite/simulator/network/hardware/base.py
This commit is contained in:
Chris McCarthy
2023-09-04 19:45:29 +01:00
42 changed files with 2976 additions and 394 deletions

View File

@@ -1,16 +1,15 @@
from primaite.simulator.network.hardware.base import Link, NIC, Node, Switch
from primaite.simulator.network.hardware.base import Link, NIC, Node
def test_node_to_node_ping():
"""Tests two Nodes are able to ping each other."""
# TODO Add actual checks. Manual check performed for now.
node_a = Node(hostname="node_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
node_a.connect_nic(nic_a)
node_a.power_on()
node_b = Node(hostname="node_b")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0")
node_b.connect_nic(nic_b)
node_b.power_on()
@@ -21,21 +20,20 @@ def test_node_to_node_ping():
def test_multi_nic():
"""Tests that Nodes with multiple NICs can ping each other and the data go across the correct links."""
# TODO Add actual checks. Manual check performed for now.
node_a = Node(hostname="node_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
node_a.connect_nic(nic_a)
node_a.power_on()
node_b = Node(hostname="node_b")
nic_b1 = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_b2 = NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0", gateway="10.0.0.1")
nic_b1 = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0")
nic_b2 = NIC(ip_address="10.0.0.12", subnet_mask="255.0.0.0")
node_b.connect_nic(nic_b1)
node_b.connect_nic(nic_b2)
node_b.power_on()
node_c = Node(hostname="node_c")
nic_c = NIC(ip_address="10.0.0.13", subnet_mask="255.0.0.0", gateway="10.0.0.1")
nic_c = NIC(ip_address="10.0.0.13", subnet_mask="255.0.0.0")
node_c.connect_nic(nic_c)
node_c.power_on()
@@ -45,42 +43,4 @@ def test_multi_nic():
node_a.ping("192.168.0.11")
node_c.ping("10.0.0.12")
def test_switched_network():
"""Tests a larges network of Nodes and Switches with one node pinging another."""
# TODO Add actual checks. Manual check performed for now.
pc_a = Node(hostname="pc_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_a.connect_nic(nic_a)
pc_a.power_on()
pc_b = Node(hostname="pc_b")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_b.connect_nic(nic_b)
pc_b.power_on()
pc_c = Node(hostname="pc_c")
nic_c = NIC(ip_address="192.168.0.12", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_c.connect_nic(nic_c)
pc_c.power_on()
pc_d = Node(hostname="pc_d")
nic_d = NIC(ip_address="192.168.0.13", subnet_mask="255.255.255.0", gateway="192.168.0.1")
pc_d.connect_nic(nic_d)
pc_d.power_on()
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
switch_2 = Switch(hostname="switch_2", num_ports=6)
switch_2.power_on()
link_nic_a_switch_1 = Link(endpoint_a=nic_a, endpoint_b=switch_1.switch_ports[1])
link_nic_b_switch_1 = Link(endpoint_a=nic_b, endpoint_b=switch_1.switch_ports[2])
link_nic_c_switch_2 = Link(endpoint_a=nic_c, endpoint_b=switch_2.switch_ports[1])
link_nic_d_switch_2 = Link(endpoint_a=nic_d, endpoint_b=switch_2.switch_ports[2])
link_switch_1_switch_2 = Link(endpoint_a=switch_1.switch_ports[6], endpoint_b=switch_2.switch_ports[6])
pc_a.ping("192.168.0.13")
assert node_c.ping("10.0.0.12")

View File

@@ -4,18 +4,17 @@ from primaite.simulator.network.hardware.base import Link, NIC, Node
def test_link_up():
"""Tests Nodes, NICs, and Links can all be connected and be in an enabled/up state."""
node_a = Node(hostname="node_a")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
node_a.connect_nic(nic_a)
node_a.power_on()
assert nic_a.enabled
node_b = Node(hostname="node_b")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0", gateway="192.168.0.1")
nic_b = NIC(ip_address="192.168.0.11", subnet_mask="255.255.255.0")
node_b.connect_nic(nic_b)
node_b.power_on()
assert nic_b.enabled
link = Link(endpoint_a=nic_a, endpoint_b=nic_b)
assert nic_a.enabled
assert nic_b.enabled
assert link.is_up

View File

@@ -6,9 +6,5 @@ from primaite.simulator.network.hardware.base import Link, NIC
def test_link_fails_with_same_nic():
"""Tests Link creation fails with endpoint_a and endpoint_b are the same NIC."""
with pytest.raises(ValueError):
nic_a = NIC(
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
nic_a = NIC(ip_address="192.168.1.2", subnet_mask="255.255.255.0")
Link(endpoint_a=nic_a, endpoint_b=nic_a)

View File

@@ -0,0 +1,55 @@
from typing import Tuple
import pytest
from primaite.simulator.network.hardware.base import Link, NIC, Node
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@pytest.fixture(scope="function")
def pc_a_pc_b_router_1() -> Tuple[Node, Node, Router]:
pc_a = Node(hostname="pc_a", default_gateway="192.168.0.1")
nic_a = NIC(ip_address="192.168.0.10", subnet_mask="255.255.255.0")
pc_a.connect_nic(nic_a)
pc_a.power_on()
pc_b = Node(hostname="pc_b", default_gateway="192.168.1.1")
nic_b = NIC(ip_address="192.168.1.10", subnet_mask="255.255.255.0")
pc_b.connect_nic(nic_b)
pc_b.power_on()
router_1 = Router(hostname="router_1")
router_1.power_on()
router_1.configure_port(1, "192.168.0.1", "255.255.255.0")
router_1.configure_port(2, "192.168.1.1", "255.255.255.0")
Link(endpoint_a=nic_a, endpoint_b=router_1.ethernet_ports[1])
Link(endpoint_a=nic_b, endpoint_b=router_1.ethernet_ports[2])
router_1.enable_port(1)
router_1.enable_port(2)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
return pc_a, pc_b, router_1
def test_ping_default_gateway(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping(pc_a.default_gateway)
def test_ping_other_router_port(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping(pc_b.default_gateway)
def test_host_on_other_subnet(pc_a_pc_b_router_1):
pc_a, pc_b, router_1 = pc_a_pc_b_router_1
assert pc_a.ping("192.168.1.10")

View File

@@ -0,0 +1,25 @@
from primaite.simulator.network.hardware.base import Link
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
def test_switched_network():
"""Tests a node can ping another node via the switch."""
client_1 = Computer(
hostname="client_1", ip_address="192.168.1.10", subnet_mask="255.255.255.0", default_gateway="192.168.1.0"
)
client_1.power_on()
server_1 = Server(
hostname=" server_1", ip_address="192.168.1.11", subnet_mask="255.255.255.0", default_gateway="192.168.1.11"
)
server_1.power_on()
switch_1 = Switch(hostname="switch_1", num_ports=6)
switch_1.power_on()
Link(endpoint_a=client_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[1])
Link(endpoint_a=server_1.ethernet_port[1], endpoint_b=switch_1.switch_ports[2])
assert client_1.ping("192.168.1.11")

View File

@@ -0,0 +1,111 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
def test_add_rule():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
assert acl.acl[1].action == ACLAction.PERMIT
assert acl.acl[1].protocol == IPProtocol.TCP
assert acl.acl[1].src_ip_address == IPv4Address("192.168.1.1")
assert acl.acl[1].src_port == Port(8080)
assert acl.acl[1].dst_ip_address == IPv4Address("192.168.1.2")
assert acl.acl[1].dst_port == Port(80)
def test_remove_rule():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
acl.remove_rule(1)
assert not acl.acl[1]
def test_rules():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
acl.add_rule(
action=ACLAction.DENY,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.3"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.4"),
dst_port=Port(80),
position=2,
)
is_permitted, rule = acl.is_permitted(
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
)
assert is_permitted
is_permitted, rule = acl.is_permitted(
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.3"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.4"),
dst_port=Port(80),
)
assert not is_permitted
def test_default_rule():
router = Router("Router")
acl = router.acl
acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.1"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.2"),
dst_port=Port(80),
position=1,
)
acl.add_rule(
action=ACLAction.DENY,
protocol=IPProtocol.TCP,
src_ip_address=IPv4Address("192.168.1.3"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.4"),
dst_port=Port(80),
position=2,
)
is_permitted, rule = acl.is_permitted(
protocol=IPProtocol.UDP,
src_ip_address=IPv4Address("192.168.1.5"),
src_port=Port(8080),
dst_ip_address=IPv4Address("192.168.1.12"),
dst_port=Port(80),
)
assert not is_permitted

View File

@@ -32,10 +32,8 @@ def test_nic_ip_address_type_conversion():
nic = NIC(
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
assert isinstance(nic.ip_address, IPv4Address)
assert isinstance(nic.gateway, IPv4Address)
def test_nic_deserialize():
@@ -43,7 +41,6 @@ def test_nic_deserialize():
nic = NIC(
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
nic_json = nic.model_dump_json()
@@ -51,21 +48,10 @@ def test_nic_deserialize():
assert nic_json == deserialized_nic.model_dump_json()
def test_nic_ip_address_as_gateway_fails():
"""Tests NIC creation fails if ip address is the same as the gateway."""
with pytest.raises(ValueError):
NIC(
ip_address="192.168.0.1",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)
def test_nic_ip_address_as_network_address_fails():
"""Tests NIC creation fails if ip address and subnet mask are a network address."""
with pytest.raises(ValueError):
NIC(
ip_address="192.168.0.0",
subnet_mask="255.255.255.0",
gateway="192.168.0.1",
)

View File

@@ -10,7 +10,7 @@ def test_frame_minimal_instantiation():
"""Tests that the minimum frame (TCP SYN) using default values."""
frame = Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20"),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20"),
tcp=TCPHeader(
src_port=8080,
dst_port=80,
@@ -38,7 +38,7 @@ def test_frame_creation_fails_tcp_without_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.TCP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.TCP),
)
@@ -47,7 +47,7 @@ def test_frame_creation_fails_udp_without_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.UDP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.UDP),
)
@@ -56,7 +56,7 @@ def test_frame_creation_fails_tcp_with_udp_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.TCP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.TCP),
udp=UDPHeader(src_port=8080, dst_port=80),
)
@@ -66,7 +66,7 @@ def test_frame_creation_fails_udp_with_tcp_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.UDP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.UDP),
udp=TCPHeader(src_port=8080, dst_port=80),
)
@@ -75,7 +75,7 @@ def test_icmp_frame_creation():
"""Tests Frame creation for ICMP."""
frame = Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.ICMP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.ICMP),
icmp=ICMPPacket(),
)
assert frame
@@ -86,5 +86,5 @@ def test_icmp_frame_creation_fails_without_icmp_header():
with pytest.raises(ValueError):
Frame(
ethernet=EthernetHeader(src_mac_addr="aa:bb:cc:dd:ee:ff", dst_mac_addr="11:22:33:44:55:66"),
ip=IPPacket(src_ip="192.168.0.10", dst_ip="192.168.0.20", protocol=IPProtocol.ICMP),
ip=IPPacket(src_ip_address="192.168.0.10", dst_ip_address="192.168.0.20", protocol=IPProtocol.ICMP),
)