#1706 - Finished up the Node and Switch MVP. Added full extensive documentation on what's happening at each step.

This commit is contained in:
Chris McCarthy
2023-08-08 20:22:18 +01:00
parent 4e4c2b501a
commit 9fbc3c91f7
10 changed files with 702 additions and 91 deletions

View File

@@ -6,6 +6,8 @@ from enum import Enum
from ipaddress import IPv4Address, IPv4Network
from typing import Any, Dict, List, Optional, Tuple, Union
from prettytable import PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
from primaite.simulator.core import SimComponent
@@ -136,22 +138,23 @@ class NIC(SimComponent):
if self.connected_node:
if self.connected_node.operating_state == NodeOperatingState.ON:
self.enabled = True
_LOGGER.info(f"NIC {self} enabled")
self.connected_node.sys_log.info(f"NIC {self} enabled")
self.pcap = PacketCapture(hostname=self.connected_node.hostname, ip_address=self.ip_address)
if self.connected_link:
self.connected_link.endpoint_up()
else:
_LOGGER.info(f"NIC {self} cannot be enabled as the endpoint is not turned on")
self.connected_node.sys_log.error(f"NIC {self} cannot be enabled as the endpoint is not turned on")
else:
msg = f"NIC {self} cannot be enabled as it is not connected to a Node"
_LOGGER.error(msg)
raise NetworkError(msg)
_LOGGER.error(f"NIC {self} cannot be enabled as it is not connected to a Node")
def disable(self):
"""Disable the NIC."""
if self.enabled:
self.enabled = False
_LOGGER.info(f"NIC {self} disabled")
if self.connected_node:
self.connected_node.sys_log.info(f"NIC {self} disabled")
else:
_LOGGER.info(f"NIC {self} disabled")
if self.connected_link:
self.connected_link.endpoint_down()
@@ -161,7 +164,6 @@ class NIC(SimComponent):
:param link: The link to which the NIC is connected.
:type link: :class:`~primaite.simulator.network.transmission.physical_layer.Link`
:raise NetworkError: When an attempt to connect a Link is made while the NIC has a connected Link.
"""
if not self.connected_link:
if self.connected_link != link:
@@ -169,11 +171,9 @@ class NIC(SimComponent):
self.connected_link = link
_LOGGER.info(f"NIC {self} connected to Link {link}")
else:
_LOGGER.warning(f"Cannot connect link to NIC ({self.mac_address}) as it is already connected")
_LOGGER.error(f"Cannot connect Link to NIC ({self.mac_address}) as it is already connected")
else:
msg = f"Cannot connect link to NIC ({self.mac_address}) as it already has a connection"
_LOGGER.error(msg)
raise NetworkError(msg)
_LOGGER.error(f"Cannot connect Link to NIC ({self.mac_address}) as it already has a connection")
def disconnect_link(self):
"""Disconnect the NIC from the connected Link."""
@@ -293,12 +293,14 @@ class SwitchPort(SimComponent):
if self.connected_node:
if self.connected_node.operating_state == NodeOperatingState.ON:
self.enabled = True
_LOGGER.info(f"SwitchPort {self} enabled")
self.connected_node.sys_log.info(f"SwitchPort {self} enabled")
self.pcap = PacketCapture(hostname=self.connected_node.hostname)
if self.connected_link:
self.connected_link.endpoint_up()
else:
_LOGGER.info(f"SwitchPort {self} cannot be enabled as the endpoint is not turned on")
self.connected_node.sys_log.info(
f"SwitchPort {self} cannot be enabled as the endpoint is not turned on"
)
else:
msg = f"SwitchPort {self} cannot be enabled as it is not connected to a Node"
_LOGGER.error(msg)
@@ -308,7 +310,10 @@ class SwitchPort(SimComponent):
"""Disable the SwitchPort."""
if self.enabled:
self.enabled = False
_LOGGER.info(f"SwitchPort {self} disabled")
if self.connected_node:
self.connected_node.sys_log.info(f"SwitchPort {self} disabled")
else:
_LOGGER.info(f"SwitchPort {self} disabled")
if self.connected_link:
self.connected_link.endpoint_down()
@@ -317,7 +322,6 @@ class SwitchPort(SimComponent):
Connect the SwitchPort to a link.
:param link: The link to which the SwitchPort is connected.
:raise NetworkError: When an attempt to connect a Link is made while the SwitchPort has a connected Link.
"""
if not self.connected_link:
if self.connected_link != link:
@@ -326,11 +330,9 @@ class SwitchPort(SimComponent):
_LOGGER.info(f"SwitchPort {self} connected to Link {link}")
self.enable()
else:
_LOGGER.warning(f"Cannot connect link to SwitchPort ({self.mac_address}) as it is already connected")
_LOGGER.error(f"Cannot connect Link to SwitchPort {self.mac_address} as it is already connected")
else:
msg = f"Cannot connect link to SwitchPort ({self.mac_address}) as it already has a connection"
_LOGGER.error(msg)
raise NetworkError(msg)
_LOGGER.error(f"Cannot connect link to SwitchPort {self.mac_address} as it already has a connection")
def disconnect_link(self):
"""Disconnect the SwitchPort from the connected Link."""
@@ -815,16 +817,34 @@ class Node(SimComponent):
super().__init__(**kwargs)
self.arp.nics = self.nics
def turn_on(self):
"""Turn on the Node, enabling its NICs if it is in the OFF state."""
def show(self):
"""Prints a table of the NICs on the Node.."""
from prettytable import PrettyTable
table = PrettyTable(["MAC Address", "Address", "Default Gateway", "Speed", "Status"])
for nic in self.nics.values():
table.add_row(
[
nic.mac_address,
f"{nic.ip_address}/{nic.ip_network.prefixlen}",
nic.gateway,
nic.speed,
"Enabled" if nic.enabled else "Disabled",
]
)
print(table)
def power_on(self):
"""Power on the Node, enabling its NICs if it is in the OFF state."""
if self.operating_state == NodeOperatingState.OFF:
self.operating_state = NodeOperatingState.ON
self.sys_log.info("Turned on")
for nic in self.nics.values():
nic.enable()
def turn_off(self):
"""Turn off the Node, disabling its NICs if it is in the ON state."""
def power_off(self):
"""Power off the Node, disabling its NICs if it is in the ON state."""
if self.operating_state == NodeOperatingState.ON:
for nic in self.nics.values():
nic.disable()
@@ -934,6 +954,14 @@ class Switch(Node):
dst_mac_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def show(self):
"""Prints a table of the SwitchPorts on the Switch."""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
for port_num, port in self.switch_ports.items():
table.add_row([port_num, port.mac_address, port.speed, "Enabled" if port.enabled else "Disabled"])
print(table)
def describe_state(self) -> Dict:
"""TODO."""
pass