#1800 - Moved the Switch code to a dedicated switch.py module.

- Added more switch tests.
- Updated ACL tests to use router acl.
- Updated more docs.
- Moved the Jupyter notebooks to _package_data and fixed up the setup to move all notebooks to ~/primaite/notebooks/example_notebooks.
This commit is contained in:
Chris McCarthy
2023-09-04 12:14:24 +01:00
parent 5111affeeb
commit 05959e5408
20 changed files with 992 additions and 333 deletions

View File

@@ -1,16 +1,17 @@
from typing import Any, Dict, Union, Optional, List
from typing import Any, Dict, List, Optional, Union
import matplotlib.pyplot as plt
import networkx as nx
from networkx import MultiGraph
from prettytable import PrettyTable, MARKDOWN
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
from primaite.simulator.network.hardware.base import Link, NIC, Node, SwitchPort, Switch
from primaite.simulator.network.hardware.base import Link, NIC, Node, SwitchPort
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
_LOGGER = getLogger(__name__)
@@ -30,7 +31,7 @@ class Network(SimComponent):
links: Dict[str, Link] = {}
def __init__(self, **kwargs):
""""
"""
Initialise the network.
Constructs the network and sets up its initial state including
@@ -84,14 +85,14 @@ class Network(SimComponent):
"Router": self.routers,
"Switch": self.switches,
"Server": self.servers,
"Computer": self.computers
"Computer": self.computers,
}
if nodes:
table = PrettyTable(["Node", "Type", "Operating State"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"Nodes"
table.title = "Nodes"
for node_type, nodes in nodes_type_map.items():
for node in nodes:
table.add_row([node.hostname, node_type, node.operating_state.name])
@@ -102,7 +103,7 @@ class Network(SimComponent):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"IP Addresses"
table.title = "IP Addresses"
for nodes in nodes_type_map.values():
for node in nodes:
for i, port in node.ethernet_port.items():
@@ -114,7 +115,7 @@ class Network(SimComponent):
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"Links"
table.title = "Links"
links = list(self.links.values())
for nodes in nodes_type_map.values():
for node in nodes:
@@ -126,7 +127,7 @@ class Network(SimComponent):
link.endpoint_b.parent.hostname,
link.is_up,
link.bandwidth,
link.current_load_percent
link.current_load_percent,
]
)
links.remove(link)
@@ -207,9 +208,7 @@ class Network(SimComponent):
node.parent = None
_LOGGER.info(f"Removed node {node.uuid} from network {self.uuid}")
def connect(
self, endpoint_a: Union[NIC, SwitchPort], endpoint_b: Union[NIC, SwitchPort], **kwargs
) -> None:
def connect(self, endpoint_a: Union[NIC, SwitchPort], endpoint_b: Union[NIC, SwitchPort], **kwargs) -> None:
"""
Connect two endpoints on the network by creating a link between their NICs/SwitchPorts.

View File

@@ -1,13 +1,12 @@
from __future__ import annotations
import random
import re
import secrets
from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from prettytable import PrettyTable, MARKDOWN
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
@@ -289,7 +288,7 @@ class SwitchPort(SimComponent):
"The speed of the SwitchPort in Mbps. Default is 100 Mbps."
mtu: int = 1500
"The Maximum Transmission Unit (MTU) of the SwitchPort in Bytes. Default is 1500 B"
connected_node: Optional[Switch] = None
connected_node: Optional[Node] = None
"The Node to which the SwitchPort is connected."
connected_link: Optional[Link] = None
"The Link to which the SwitchPort is connected."
@@ -715,7 +714,7 @@ class ARPCache:
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
self.send_arp_reply(arp_packet, from_nic)
def __contains__(self, item) -> bool:
def __contains__(self, item: Any) -> bool:
return item in self.arp
@@ -765,7 +764,7 @@ class ICMP:
identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1,
)
payload = secrets.token_urlsafe(int(32/1.3)) # Standard ICMP 32 bytes size
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(
ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet, payload=payload
)
@@ -829,7 +828,7 @@ class ICMP:
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
icmp_packet = ICMPPacket(identifier=identifier, sequence=sequence)
payload = secrets.token_urlsafe(int(32/1.3)) # Standard ICMP 32 bytes size
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_packet, payload=payload)
nic.send_frame(frame)
return sequence, icmp_packet.identifier
@@ -1049,7 +1048,8 @@ class Node(SimComponent):
f"Ping statistics for {target_ip_address}: "
f"Packets: Sent = {pings}, "
f"Received = {request_replies}, "
f"Lost = {pings-request_replies} ({(pings-request_replies)/pings*100}% loss)")
f"Lost = {pings-request_replies} ({(pings-request_replies)/pings*100}% loss)"
)
return passed
return False
@@ -1084,102 +1084,3 @@ class Node(SimComponent):
pass
elif frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
class Switch(Node):
"""A class representing a Layer 2 network switch."""
num_ports: int = 24
"The number of ports on the switch."
switch_ports: Dict[int, SwitchPort] = {}
"The SwitchPorts on the switch."
mac_address_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.parent = self
port.port_num = port_num
def show(self, markdown: bool = False):
"""Prints a table of the SwitchPorts on the Switch."""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Switch Ports"
for port_num, port in self.switch_ports.items():
table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"])
print(table)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
return {
"uuid": self.uuid,
"num_ports": self.num_ports, # redundant?
"ports": {port_num: port.describe_state() for port_num, port in self.switch_ports.items()},
"mac_address_table": {mac: port for mac, port in self.mac_address_table.items()},
}
def _add_mac_table_entry(self, mac_address: str, switch_port: SwitchPort):
mac_table_port = self.mac_address_table.get(mac_address)
if not mac_table_port:
self.mac_address_table[mac_address] = switch_port
self.sys_log.info(f"Added MAC table entry: Port {switch_port.port_num} -> {mac_address}")
else:
if mac_table_port != switch_port:
self.mac_address_table.pop(mac_address)
self.sys_log.info(f"Removed MAC table entry: Port {mac_table_port.port_num} -> {mac_address}")
self._add_mac_table_entry(mac_address, switch_port)
def forward_frame(self, frame: Frame, incoming_port: SwitchPort):
"""
Forward a frame to the appropriate port based on the destination MAC address.
:param frame: The Frame to be forwarded.
:param incoming_port: The port number from which the frame was received.
"""
src_mac = frame.ethernet.src_mac_addr
dst_mac = frame.ethernet.dst_mac_addr
self._add_mac_table_entry(src_mac, incoming_port)
outgoing_port = self.mac_address_table.get(dst_mac)
if outgoing_port or dst_mac != "ff:ff:ff:ff:ff:ff":
outgoing_port.send_frame(frame)
else:
# If the destination MAC is not in the table, flood to all ports except incoming
for port in self.switch_ports.values():
if port != incoming_port:
port.send_frame(frame)
def disconnect_link_from_port(self, link: Link, port_number: int):
"""
Disconnect a given link from the specified port number on the switch.
:param link: The Link object to be disconnected.
:param port_number: The port number on the switch from where the link should be disconnected.
:raise NetworkError: When an invalid port number is provided or the link does not match the connection.
"""
port = self.switch_ports.get(port_number)
if port is None:
msg = f"Invalid port number {port_number} on the switch"
_LOGGER.error(msg)
raise NetworkError(msg)
if port.connected_link != link:
msg = f"The link does not match the connection at port number {port_number}"
_LOGGER.error(msg)
raise NetworkError(msg)
port.disconnect_link()

View File

@@ -1,6 +1,6 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.base import Node, NIC
from primaite.simulator.network.hardware.base import NIC, Node
class Computer(Node):

View File

@@ -5,7 +5,7 @@ from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from typing import Dict, List, Optional, Tuple, Union
from prettytable import PrettyTable, MARKDOWN
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import SimComponent
from primaite.simulator.network.hardware.base import ARPCache, ICMP, NIC, Node
@@ -71,6 +71,7 @@ class AccessControlList(SimComponent):
:ivar int max_acl_rules: Maximum number of ACL rules that can be added. Default is 25.
:ivar List[Optional[ACLRule]] _acl: A list containing the ACL rules.
"""
sys_log: SysLog
implicit_action: ACLAction
implicit_rule: ACLRule
@@ -105,14 +106,14 @@ class AccessControlList(SimComponent):
return self._acl
def add_rule(
self,
action: ACLAction,
protocol: Optional[IPProtocol] = None,
src_ip: Optional[Union[str, IPv4Address]] = None,
src_port: Optional[Port] = None,
dst_ip: Optional[Union[str, IPv4Address]] = None,
dst_port: Optional[Port] = None,
position: int = 0,
self,
action: ACLAction,
protocol: Optional[IPProtocol] = None,
src_ip: Optional[Union[str, IPv4Address]] = None,
src_port: Optional[Port] = None,
dst_ip: Optional[Union[str, IPv4Address]] = None,
dst_port: Optional[Port] = None,
position: int = 0,
) -> None:
"""
Add a new ACL rule.
@@ -150,12 +151,12 @@ class AccessControlList(SimComponent):
raise ValueError(f"Position {position} is out of bounds.")
def is_permitted(
self,
protocol: IPProtocol,
src_ip: Union[str, IPv4Address],
src_port: Optional[Port],
dst_ip: Union[str, IPv4Address],
dst_port: Optional[Port],
self,
protocol: IPProtocol,
src_ip: Union[str, IPv4Address],
src_port: Optional[Port],
dst_ip: Union[str, IPv4Address],
dst_port: Optional[Port],
) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
"""
Check if a packet with the given properties is permitted through the ACL.
@@ -177,23 +178,23 @@ class AccessControlList(SimComponent):
continue
if (
(rule.src_ip == src_ip or rule.src_ip is None)
and (rule.dst_ip == dst_ip or rule.dst_ip is None)
and (rule.protocol == protocol or rule.protocol is None)
and (rule.src_port == src_port or rule.src_port is None)
and (rule.dst_port == dst_port or rule.dst_port is None)
(rule.src_ip == src_ip or rule.src_ip is None)
and (rule.dst_ip == dst_ip or rule.dst_ip is None)
and (rule.protocol == protocol or rule.protocol is None)
and (rule.src_port == src_port or rule.src_port is None)
and (rule.dst_port == dst_port or rule.dst_port is None)
):
return rule.action == ACLAction.PERMIT, rule
return self.implicit_action == ACLAction.PERMIT, f"Implicit {self.implicit_action.name}"
def get_relevant_rules(
self,
protocol: IPProtocol,
src_ip: Union[str, IPv4Address],
src_port: Port,
dst_ip: Union[str, IPv4Address],
dst_port: Port,
self,
protocol: IPProtocol,
src_ip: Union[str, IPv4Address],
src_port: Port,
dst_ip: Union[str, IPv4Address],
dst_port: Port,
) -> List[ACLRule]:
"""
Get the list of relevant rules for a packet with given properties.
@@ -215,11 +216,11 @@ class AccessControlList(SimComponent):
continue
if (
(rule.src_ip == src_ip or rule.src_ip is None)
or (rule.dst_ip == dst_ip or rule.dst_ip is None)
or (rule.protocol == protocol or rule.protocol is None)
or (rule.src_port == src_port or rule.src_port is None)
or (rule.dst_port == dst_port or rule.dst_port is None)
(rule.src_ip == src_ip or rule.src_ip is None)
or (rule.dst_ip == dst_ip or rule.dst_ip is None)
or (rule.protocol == protocol or rule.protocol is None)
or (rule.src_port == src_port or rule.src_port is None)
or (rule.dst_port == dst_port or rule.dst_port is None)
):
relevant_rules.append(rule)
@@ -326,11 +327,11 @@ class RouteTable(SimComponent):
pass
def add_route(
self,
address: Union[IPv4Address, str],
subnet_mask: Union[IPv4Address, str],
next_hop: Union[IPv4Address, str],
metric: float = 0.0,
self,
address: Union[IPv4Address, str],
subnet_mask: Union[IPv4Address, str],
next_hop: Union[IPv4Address, str],
metric: float = 0.0,
):
"""
Add a route to the routing table.
@@ -397,6 +398,7 @@ class RouterARPCache(ARPCache):
:ivar SysLog sys_log: A system log for logging messages.
:ivar Router router: The router to which this ARP cache belongs.
"""
def __init__(self, sys_log: SysLog, router: Router):
super().__init__(sys_log)
self.router: Router = router
@@ -416,7 +418,8 @@ class RouterARPCache(ARPCache):
if arp_packet.target_ip == nic.ip_address:
# reply to the Router specifically
self.sys_log.info(
f"Received ARP response for {arp_packet.sender_ip} from {arp_packet.sender_mac_addr} via NIC {from_nic}"
f"Received ARP response for {arp_packet.sender_ip} "
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
)
self.add_arp_cache_entry(
ip_address=arp_packet.sender_ip,
@@ -462,6 +465,7 @@ class RouterICMP(ICMP):
:param router: The router to which this ICMP handler belongs.
:type router: Router
"""
router: Router
def __init__(self, sys_log: SysLog, arp_cache: ARPCache, router: Router):
@@ -492,16 +496,22 @@ class RouterICMP(ICMP):
# Network Layer
ip_packet = IPPacket(src_ip=nic.ip_address, dst_ip=frame.ip.src_ip, protocol=IPProtocol.ICMP)
# Data Link Layer
ethernet_header = EthernetHeader(src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address)
ethernet_header = EthernetHeader(
src_mac_addr=src_nic.mac_address, dst_mac_addr=target_mac_address
)
icmp_reply_packet = ICMPPacket(
icmp_type=ICMPType.ECHO_REPLY,
icmp_code=0,
identifier=frame.icmp.identifier,
sequence=frame.icmp.sequence + 1,
)
payload = secrets.token_urlsafe(int(32/1.3)) # Standard ICMP 32 bytes size
payload = secrets.token_urlsafe(int(32 / 1.3)) # Standard ICMP 32 bytes size
frame = Frame(
ethernet=ethernet_header, ip=ip_packet, tcp=tcp_header, icmp=icmp_reply_packet, payload=payload
ethernet=ethernet_header,
ip=ip_packet,
tcp=tcp_header,
icmp=icmp_reply_packet,
payload=payload,
)
self.sys_log.info(f"Sending echo reply to {frame.ip.dst_ip}")
@@ -540,6 +550,7 @@ class Router(Node):
:ivar int num_ports: The number of ports in the router.
:ivar dict kwargs: Optional keyword arguments for SysLog, ACL, RouteTable, RouterARPCache, RouterICMP.
"""
num_ports: int
ethernet_ports: Dict[int, NIC] = {}
acl: AccessControlList
@@ -588,12 +599,12 @@ class Router(Node):
def route_frame(self, frame: Frame, from_nic: NIC, re_attempt: bool = False) -> None:
"""
Route a given frame from a source NIC to its destination.
Route a given frame from a source NIC to its destination.
:param frame: The frame to be routed.
:param from_nic: The source network interface.
:param re_attempt: Flag to indicate if the routing is a reattempt.
"""
:param frame: The frame to be routed.
:param from_nic: The source network interface.
:param re_attempt: Flag to indicate if the routing is a reattempt.
"""
# Check if src ip is on network of one of the NICs
nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip)
target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip)

View File

@@ -1,6 +1,3 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.base import Node, NIC
from primaite.simulator.network.hardware.nodes.computer import Computer

View File

@@ -0,0 +1,121 @@
from typing import Dict
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
from primaite.links.link import Link
from primaite.simulator.network.hardware.base import Node, SwitchPort
from primaite.simulator.network.transmission.data_link_layer import Frame
_LOGGER = getLogger(__name__)
class Switch(Node):
"""
A class representing a Layer 2 network switch.
:ivar num_ports: The number of ports on the switch. Default is 24.
"""
num_ports: int = 24
"The number of ports on the switch."
switch_ports: Dict[int, SwitchPort] = {}
"The SwitchPorts on the switch."
mac_address_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.parent = self
port.port_num = port_num
def show(self, markdown: bool = False):
"""
Prints a table of the SwitchPorts on the Switch.
:param markdown: If True, outputs the table in markdown format. Default is False.
"""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
if markdown:
table.set_style(MARKDOWN)
table.align = "l"
table.title = f"{self.hostname} Switch Ports"
for port_num, port in self.switch_ports.items():
table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"])
print(table)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
:return: Current state of this object and child objects.
"""
return {
"uuid": self.uuid,
"num_ports": self.num_ports, # redundant?
"ports": {port_num: port.describe_state() for port_num, port in self.switch_ports.items()},
"mac_address_table": {mac: port for mac, port in self.mac_address_table.items()},
}
def _add_mac_table_entry(self, mac_address: str, switch_port: SwitchPort):
"""
Private method to add an entry to the MAC address table.
:param mac_address: MAC address to be added.
:param switch_port: Corresponding SwitchPort object.
"""
mac_table_port = self.mac_address_table.get(mac_address)
if not mac_table_port:
self.mac_address_table[mac_address] = switch_port
self.sys_log.info(f"Added MAC table entry: Port {switch_port.port_num} -> {mac_address}")
else:
if mac_table_port != switch_port:
self.mac_address_table.pop(mac_address)
self.sys_log.info(f"Removed MAC table entry: Port {mac_table_port.port_num} -> {mac_address}")
self._add_mac_table_entry(mac_address, switch_port)
def forward_frame(self, frame: Frame, incoming_port: SwitchPort):
"""
Forward a frame to the appropriate port based on the destination MAC address.
:param frame: The Frame to be forwarded.
:param incoming_port: The port number from which the frame was received.
"""
src_mac = frame.ethernet.src_mac_addr
dst_mac = frame.ethernet.dst_mac_addr
self._add_mac_table_entry(src_mac, incoming_port)
outgoing_port = self.mac_address_table.get(dst_mac)
if outgoing_port or dst_mac != "ff:ff:ff:ff:ff:ff":
outgoing_port.send_frame(frame)
else:
# If the destination MAC is not in the table, flood to all ports except incoming
for port in self.switch_ports.values():
if port != incoming_port:
port.send_frame(frame)
def disconnect_link_from_port(self, link: Link, port_number: int):
"""
Disconnect a given link from the specified port number on the switch.
:param link: The Link object to be disconnected.
:param port_number: The port number on the switch from where the link should be disconnected.
:raise NetworkError: When an invalid port number is provided or the link does not match the connection.
"""
port = self.switch_ports.get(port_number)
if port is None:
msg = f"Invalid port number {port_number} on the switch"
_LOGGER.error(msg)
raise NetworkError(msg)
if port.connected_link != link:
msg = f"The link does not match the connection at port number {port_number}"
_LOGGER.error(msg)
raise NetworkError(msg)
port.disconnect_link()

View File

@@ -1,8 +1,9 @@
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import Switch, NIC
from primaite.simulator.network.hardware.base import NIC
from primaite.simulator.network.hardware.nodes.computer import Computer
from primaite.simulator.network.hardware.nodes.router import Router, ACLAction
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.network.hardware.nodes.switch import Switch
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
@@ -42,36 +43,21 @@ def client_server_routed() -> Network:
# Client 1
client_1 = Computer(
hostname="client_1",
ip_address="192.168.2.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.2.1"
hostname="client_1", ip_address="192.168.2.2", subnet_mask="255.255.255.0", default_gateway="192.168.2.1"
)
client_1.power_on()
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
# Server 1
server_1 = Server(
hostname="server_1",
ip_address="192.168.1.2",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
hostname="server_1", ip_address="192.168.1.2", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
)
server_1.power_on()
network.connect(endpoint_b=server_1.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
router_1.acl.add_rule(
action=ACLAction.PERMIT,
src_port=Port.ARP,
dst_port=Port.ARP,
position=22
)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.ICMP,
position=23
)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
return network
@@ -135,20 +121,14 @@ def arcd_uc2_network() -> Network:
# Client 1
client_1 = Computer(
hostname="client_1",
ip_address="192.168.10.21",
subnet_mask="255.255.255.0",
default_gateway="192.168.10.1"
hostname="client_1", ip_address="192.168.10.21", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
)
client_1.power_on()
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
# Client 2
client_2 = Computer(
hostname="client_2",
ip_address="192.168.10.22",
subnet_mask="255.255.255.0",
default_gateway="192.168.10.1"
hostname="client_2", ip_address="192.168.10.22", subnet_mask="255.255.255.0", default_gateway="192.168.10.1"
)
client_2.power_on()
network.connect(endpoint_b=client_2.ethernet_port[1], endpoint_a=switch_2.switch_ports[2])
@@ -158,17 +138,14 @@ def arcd_uc2_network() -> Network:
hostname="domain_controller",
ip_address="192.168.1.10",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
default_gateway="192.168.1.1",
)
domain_controller.power_on()
network.connect(endpoint_b=domain_controller.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
# Web Server
web_server = Server(
hostname="web_server",
ip_address="192.168.1.12",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
hostname="web_server", ip_address="192.168.1.12", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
)
web_server.power_on()
network.connect(endpoint_b=web_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[2])
@@ -178,17 +155,14 @@ def arcd_uc2_network() -> Network:
hostname="database_server",
ip_address="192.168.1.14",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
default_gateway="192.168.1.1",
)
database_server.power_on()
network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3])
# Backup Server
backup_server = Server(
hostname="backup_server",
ip_address="192.168.1.16",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
hostname="backup_server", ip_address="192.168.1.16", subnet_mask="255.255.255.0", default_gateway="192.168.1.1"
)
backup_server.power_on()
network.connect(endpoint_b=backup_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[4])
@@ -198,24 +172,15 @@ def arcd_uc2_network() -> Network:
hostname="security_suite",
ip_address="192.168.1.110",
subnet_mask="255.255.255.0",
default_gateway="192.168.1.1"
default_gateway="192.168.1.1",
)
security_suite.power_on()
network.connect(endpoint_b=security_suite.ethernet_port[1], endpoint_a=switch_1.switch_ports[7])
security_suite.connect_nic(NIC(ip_address="192.168.10.110", subnet_mask="255.255.255.0"))
network.connect(endpoint_b=security_suite.ethernet_port[2], endpoint_a=switch_2.switch_ports[7])
router_1.acl.add_rule(
action=ACLAction.PERMIT,
src_port=Port.ARP,
dst_port=Port.ARP,
position=22
)
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
router_1.acl.add_rule(
action=ACLAction.PERMIT,
protocol=IPProtocol.ICMP,
position=23
)
router_1.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
return network