Underscore 'parent' refs to make pydantic happy.

Rename attributes like connected_link and connected_node to start with
an underscore.

This will prevent circular dependency and stack recursion depth error.
This commit is contained in:
Marek Wolan
2023-09-19 11:28:13 +01:00
parent a719389e05
commit 610517d817
3 changed files with 55 additions and 55 deletions

View File

@@ -89,9 +89,9 @@ class NIC(SimComponent):
"The Maximum Transmission Unit (MTU) of the NIC in Bytes. Default is 1500 B"
wake_on_lan: bool = False
"Indicates if the NIC supports Wake-on-LAN functionality."
connected_node: Optional[Node] = None
_connected_node: Optional[Node] = None
"The Node to which the NIC is connected."
connected_link: Optional[Link] = None
_connected_link: Optional[Link] = None
"The Link to which the NIC is connected."
enabled: bool = False
"Indicates whether the NIC is enabled."
@@ -166,21 +166,21 @@ class NIC(SimComponent):
"""Attempt to enable the NIC."""
if self.enabled:
return
if not self.connected_node:
if not self._connected_node:
_LOGGER.error(f"NIC {self} cannot be enabled as it is not connected to a Node")
return
if self.connected_node.operating_state != NodeOperatingState.ON:
self.connected_node.sys_log.error(f"NIC {self} cannot be enabled as the endpoint is not turned on")
if self._connected_node.operating_state != NodeOperatingState.ON:
self._connected_node.sys_log.error(f"NIC {self} cannot be enabled as the endpoint is not turned on")
return
if not self.connected_link:
if not self._connected_link:
_LOGGER.error(f"NIC {self} cannot be enabled as it is not connected to a Link")
return
self.enabled = True
self.connected_node.sys_log.info(f"NIC {self} enabled")
self.pcap = PacketCapture(hostname=self.connected_node.hostname, ip_address=self.ip_address)
if self.connected_link:
self.connected_link.endpoint_up()
self._connected_node.sys_log.info(f"NIC {self} enabled")
self.pcap = PacketCapture(hostname=self._connected_node.hostname, ip_address=self.ip_address)
if self._connected_link:
self._connected_link.endpoint_up()
def disable(self):
"""Disable the NIC."""
@@ -188,12 +188,12 @@ class NIC(SimComponent):
return
self.enabled = False
if self.connected_node:
self.connected_node.sys_log.info(f"NIC {self} disabled")
if self._connected_node:
self._connected_node.sys_log.info(f"NIC {self} disabled")
else:
_LOGGER.debug(f"NIC {self} disabled")
if self.connected_link:
self.connected_link.endpoint_down()
if self._connected_link:
self._connected_link.endpoint_down()
def connect_link(self, link: Link):
"""
@@ -202,26 +202,26 @@ class NIC(SimComponent):
:param link: The link to which the NIC is connected.
:type link: :class:`~primaite.simulator.network.transmission.physical_layer.Link`
"""
if self.connected_link:
if self._connected_link:
_LOGGER.error(f"Cannot connect Link to NIC ({self.mac_address}) as it already has a connection")
return
if self.connected_link == link:
if self._connected_link == link:
_LOGGER.error(f"Cannot connect Link to NIC ({self.mac_address}) as it is already connected")
return
# TODO: Inform the Node that a link has been connected
self.connected_link = link
self._connected_link = link
self.enable()
_LOGGER.debug(f"NIC {self} connected to Link {link}")
def disconnect_link(self):
"""Disconnect the NIC from the connected Link."""
if self.connected_link.endpoint_a == self:
self.connected_link.endpoint_a = None
if self.connected_link.endpoint_b == self:
self.connected_link.endpoint_b = None
self.connected_link = None
if self._connected_link.endpoint_a == self:
self._connected_link.endpoint_a = None
if self._connected_link.endpoint_b == self:
self._connected_link.endpoint_b = None
self._connected_link = None
def add_dns_server(self, ip_address: IPv4Address):
"""
@@ -251,7 +251,7 @@ class NIC(SimComponent):
if self.enabled:
frame.set_sent_timestamp()
self.pcap.capture(frame)
self.connected_link.transmit_frame(sender_nic=self, frame=frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
# Cannot send Frame as the NIC is not enabled
return False
@@ -270,7 +270,7 @@ class NIC(SimComponent):
self.pcap.capture(frame)
# If this destination or is broadcast
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
self.connected_node.receive_frame(frame=frame, from_nic=self)
self._connected_node.receive_frame(frame=frame, from_nic=self)
return True
return False
@@ -295,9 +295,9 @@ class SwitchPort(SimComponent):
"The speed of the SwitchPort in Mbps. Default is 100 Mbps."
mtu: int = 1500
"The Maximum Transmission Unit (MTU) of the SwitchPort in Bytes. Default is 1500 B"
connected_node: Optional[Node] = None
_connected_node: Optional[Node] = None
"The Node to which the SwitchPort is connected."
connected_link: Optional[Link] = None
_connected_link: Optional[Link] = None
"The Link to which the SwitchPort is connected."
enabled: bool = False
"Indicates whether the SwitchPort is enabled."
@@ -334,31 +334,31 @@ class SwitchPort(SimComponent):
if self.enabled:
return
if not self.connected_node:
if not self._connected_node:
_LOGGER.error(f"SwitchPort {self} cannot be enabled as it is not connected to a Node")
return
if self.connected_node.operating_state != NodeOperatingState.ON:
self.connected_node.sys_log.info(f"SwitchPort {self} cannot be enabled as the endpoint is not turned on")
if self._connected_node.operating_state != NodeOperatingState.ON:
self._connected_node.sys_log.info(f"SwitchPort {self} cannot be enabled as the endpoint is not turned on")
return
self.enabled = True
self.connected_node.sys_log.info(f"SwitchPort {self} enabled")
self.pcap = PacketCapture(hostname=self.connected_node.hostname, switch_port_number=self.port_num)
if self.connected_link:
self.connected_link.endpoint_up()
self._connected_node.sys_log.info(f"SwitchPort {self} enabled")
self.pcap = PacketCapture(hostname=self._connected_node.hostname, switch_port_number=self.port_num)
if self._connected_link:
self._connected_link.endpoint_up()
def disable(self):
"""Disable the SwitchPort."""
if not self.enabled:
return
self.enabled = False
if self.connected_node:
self.connected_node.sys_log.info(f"SwitchPort {self} disabled")
if self._connected_node:
self._connected_node.sys_log.info(f"SwitchPort {self} disabled")
else:
_LOGGER.debug(f"SwitchPort {self} disabled")
if self.connected_link:
self.connected_link.endpoint_down()
if self._connected_link:
self._connected_link.endpoint_down()
def connect_link(self, link: Link):
"""
@@ -366,26 +366,26 @@ class SwitchPort(SimComponent):
:param link: The link to which the SwitchPort is connected.
"""
if self.connected_link:
if self._connected_link:
_LOGGER.error(f"Cannot connect link to SwitchPort {self.mac_address} as it already has a connection")
return
if self.connected_link == link:
if self._connected_link == link:
_LOGGER.error(f"Cannot connect Link to SwitchPort {self.mac_address} as it is already connected")
return
# TODO: Inform the Switch that a link has been connected
self.connected_link = link
self._connected_link = link
_LOGGER.debug(f"SwitchPort {self} connected to Link {link}")
self.enable()
def disconnect_link(self):
"""Disconnect the SwitchPort from the connected Link."""
if self.connected_link.endpoint_a == self:
self.connected_link.endpoint_a = None
if self.connected_link.endpoint_b == self:
self.connected_link.endpoint_b = None
self.connected_link = None
if self._connected_link.endpoint_a == self:
self._connected_link.endpoint_a = None
if self._connected_link.endpoint_b == self:
self._connected_link.endpoint_b = None
self._connected_link = None
def send_frame(self, frame: Frame) -> bool:
"""
@@ -395,7 +395,7 @@ class SwitchPort(SimComponent):
"""
if self.enabled:
self.pcap.capture(frame)
self.connected_link.transmit_frame(sender_nic=self, frame=frame)
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
return True
# Cannot send Frame as the SwitchPort is not enabled
return False
@@ -411,7 +411,7 @@ class SwitchPort(SimComponent):
if self.enabled:
frame.decrement_ttl()
self.pcap.capture(frame)
connected_node: Node = self.connected_node
connected_node: Node = self._connected_node
connected_node.forward_frame(frame=frame, incoming_port=self)
return True
return False
@@ -1037,7 +1037,7 @@ class Node(SimComponent):
self.operating_state = NodeOperatingState.ON
self.sys_log.info("Turned on")
for nic in self.nics.values():
if nic.connected_link:
if nic._connected_link:
nic.enable()
def power_off(self):
@@ -1058,7 +1058,7 @@ class Node(SimComponent):
if nic.uuid not in self.nics:
self.nics[nic.uuid] = nic
self.ethernet_port[len(self.nics)] = nic
nic.connected_node = self
nic._connected_node = self
nic.parent = self
self.sys_log.info(f"Connected NIC {nic}")
if self.operating_state == NodeOperatingState.ON:
@@ -1225,7 +1225,7 @@ class Switch(Node):
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port._connected_node = self
port.parent = self
port.port_num = port_num
@@ -1298,7 +1298,7 @@ class Switch(Node):
_LOGGER.error(msg)
raise NetworkError(msg)
if port.connected_link != link:
if port._connected_link != link:
msg = f"The link does not match the connection at port number {port_number}"
_LOGGER.error(msg)
raise NetworkError(msg)

View File

@@ -30,7 +30,7 @@ class Switch(Node):
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port._connected_node = self
port.parent = self
port.port_num = port_num
@@ -113,7 +113,7 @@ class Switch(Node):
_LOGGER.error(msg)
raise NetworkError(msg)
if port.connected_link != link:
if port._connected_link != link:
msg = f"The link does not match the connection at port number {port_number}"
_LOGGER.error(msg)
raise NetworkError(msg)

View File

@@ -70,8 +70,8 @@ def test_connecting_node_to_itself():
net.connect(node.nics[nic1.uuid], node.nics[nic2.uuid], bandwidth=30)
assert node in net
assert nic1.connected_link is None
assert nic2.connected_link is None
assert nic1._connected_link is None
assert nic2._connected_link is None
assert len(net.links) == 0