#2248 - Big refactor of base with all Network Interface subclasses created to allow for proper management of ports on devices as it was starting to get messy with the Router. Some routing tests still need fixing as ARP doesn't seem to be working properly
This commit is contained in:
@@ -659,7 +659,7 @@ simulation:
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
dns_server: 192.168.1.10
|
||||
nics:
|
||||
network_interfaces:
|
||||
2: # unfortunately this number is currently meaningless, they're just added in order and take up the next available slot
|
||||
ip_address: 192.168.10.110
|
||||
subnet_mask: 255.255.255.0
|
||||
|
||||
@@ -1070,7 +1070,7 @@ simulation:
|
||||
subnet_mask: 255.255.255.0
|
||||
default_gateway: 192.168.1.1
|
||||
dns_server: 192.168.1.10
|
||||
nics:
|
||||
network_interfaces:
|
||||
2: # unfortunately this number is currently meaningless, they're just added in order and take up the next available slot
|
||||
ip_address: 192.168.10.110
|
||||
subnet_mask: 255.255.255.0
|
||||
|
||||
@@ -555,7 +555,7 @@ class NetworkNICAbstractAction(AbstractAction):
|
||||
"network",
|
||||
"node",
|
||||
node_uuid,
|
||||
"nic",
|
||||
"network_interface",
|
||||
nic_uuid,
|
||||
self.verb,
|
||||
]
|
||||
@@ -672,8 +672,8 @@ class ActionManager:
|
||||
self.ip_address_list = []
|
||||
for node_uuid in self.node_uuids:
|
||||
node_obj = self.game.simulation.network.nodes[node_uuid]
|
||||
nics = node_obj.nics
|
||||
for nic_uuid, nic_obj in nics.items():
|
||||
network_interfaces = node_obj.network_interfaces
|
||||
for nic_uuid, nic_obj in network_interfaces.items():
|
||||
self.ip_address_list.append(nic_obj.ip_address)
|
||||
|
||||
# action_args are settings which are applied to the action space as a whole.
|
||||
@@ -898,10 +898,10 @@ class ActionManager:
|
||||
"""
|
||||
node_uuid = self.get_node_uuid_by_idx(node_idx)
|
||||
node_obj = self.game.simulation.network.nodes[node_uuid]
|
||||
nics = list(node_obj.nics.keys())
|
||||
if len(nics) <= nic_idx:
|
||||
network_interfaces = list(node_obj.network_interfaces.keys())
|
||||
if len(network_interfaces) <= nic_idx:
|
||||
return None
|
||||
return nics[nic_idx]
|
||||
return network_interfaces[nic_idx]
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, game: "PrimaiteGame", cfg: Dict) -> "ActionManager":
|
||||
@@ -936,7 +936,7 @@ class ActionManager:
|
||||
node_ref = entry["node_ref"]
|
||||
nic_num = entry["nic_num"]
|
||||
node_obj = game.simulation.network.get_node_by_hostname(node_ref)
|
||||
ip_address = node_obj.ethernet_port[nic_num].ip_address
|
||||
ip_address = node_obj.network_interface[nic_num].ip_address
|
||||
ip_address_list.append(ip_address)
|
||||
|
||||
obj = cls(
|
||||
|
||||
@@ -406,7 +406,7 @@ class NodeObservation(AbstractObservation):
|
||||
where: Optional[Tuple[str]] = None,
|
||||
services: List[ServiceObservation] = [],
|
||||
folders: List[FolderObservation] = [],
|
||||
nics: List[NicObservation] = [],
|
||||
network_interfaces: List[NicObservation] = [],
|
||||
logon_status: bool = False,
|
||||
num_services_per_node: int = 2,
|
||||
num_folders_per_node: int = 2,
|
||||
@@ -429,9 +429,9 @@ class NodeObservation(AbstractObservation):
|
||||
:type folders: Dict[int,str], optional
|
||||
:param max_folders: Max number of folders in this node's obs space, defaults to 2
|
||||
:type max_folders: int, optional
|
||||
:param nics: Mapping between position in observation space and NIC idx, defaults to {}
|
||||
:type nics: Dict[int,str], optional
|
||||
:param max_nics: Max number of NICS in this node's obs space, defaults to 5
|
||||
:param network_interfaces: Mapping between position in observation space and NIC idx, defaults to {}
|
||||
:type network_interfaces: Dict[int,str], optional
|
||||
:param max_nics: Max number of network interfaces in this node's obs space, defaults to 5
|
||||
:type max_nics: int, optional
|
||||
"""
|
||||
super().__init__()
|
||||
@@ -456,11 +456,11 @@ class NodeObservation(AbstractObservation):
|
||||
msg = f"Too many folders in Node observation for node. Truncating service {truncated_folder.where[-1]}"
|
||||
_LOGGER.warning(msg)
|
||||
|
||||
self.nics: List[NicObservation] = nics
|
||||
while len(self.nics) < num_nics_per_node:
|
||||
self.nics.append(NicObservation())
|
||||
while len(self.nics) > num_nics_per_node:
|
||||
truncated_nic = self.nics.pop()
|
||||
self.network_interfaces: List[NicObservation] = network_interfaces
|
||||
while len(self.network_interfaces) < num_nics_per_node:
|
||||
self.network_interfaces.append(NicObservation())
|
||||
while len(self.network_interfaces) > num_nics_per_node:
|
||||
truncated_nic = self.network_interfaces.pop()
|
||||
msg = f"Too many NICs in Node observation for node. Truncating service {truncated_nic.where[-1]}"
|
||||
_LOGGER.warning(msg)
|
||||
|
||||
@@ -469,7 +469,7 @@ class NodeObservation(AbstractObservation):
|
||||
self.default_observation: Dict = {
|
||||
"SERVICES": {i + 1: s.default_observation for i, s in enumerate(self.services)},
|
||||
"FOLDERS": {i + 1: f.default_observation for i, f in enumerate(self.folders)},
|
||||
"NICS": {i + 1: n.default_observation for i, n in enumerate(self.nics)},
|
||||
"NETWORK_INTERFACES": {i + 1: n.default_observation for i, n in enumerate(self.network_interfaces)},
|
||||
"operating_status": 0,
|
||||
}
|
||||
if self.logon_status:
|
||||
@@ -494,7 +494,7 @@ class NodeObservation(AbstractObservation):
|
||||
obs["SERVICES"] = {i + 1: service.observe(state) for i, service in enumerate(self.services)}
|
||||
obs["FOLDERS"] = {i + 1: folder.observe(state) for i, folder in enumerate(self.folders)}
|
||||
obs["operating_status"] = node_state["operating_state"]
|
||||
obs["NICS"] = {i + 1: nic.observe(state) for i, nic in enumerate(self.nics)}
|
||||
obs["NETWORK_INTERFACES"] = {i + 1: network_interface.observe(state) for i, network_interface in enumerate(self.network_interfaces)}
|
||||
|
||||
if self.logon_status:
|
||||
obs["logon_status"] = 0
|
||||
@@ -508,7 +508,7 @@ class NodeObservation(AbstractObservation):
|
||||
"SERVICES": spaces.Dict({i + 1: service.space for i, service in enumerate(self.services)}),
|
||||
"FOLDERS": spaces.Dict({i + 1: folder.space for i, folder in enumerate(self.folders)}),
|
||||
"operating_status": spaces.Discrete(5),
|
||||
"NICS": spaces.Dict({i + 1: nic.space for i, nic in enumerate(self.nics)}),
|
||||
"NETWORK_INTERFACES": spaces.Dict({i + 1: network_interface.space for i, network_interface in enumerate(self.network_interfaces)}),
|
||||
}
|
||||
if self.logon_status:
|
||||
space_shape["logon_status"] = spaces.Discrete(3)
|
||||
@@ -564,13 +564,13 @@ class NodeObservation(AbstractObservation):
|
||||
]
|
||||
# create some configs for the NIC observation in the format {"nic_num":1}, {"nic_num":2}, {"nic_num":3}, etc.
|
||||
nic_configs = [{"nic_num": i for i in range(num_nics_per_node)}]
|
||||
nics = [NicObservation.from_config(config=c, game=game, parent_where=where) for c in nic_configs]
|
||||
network_interfaces = [NicObservation.from_config(config=c, game=game, parent_where=where) for c in nic_configs]
|
||||
logon_status = config.get("logon_status", False)
|
||||
return cls(
|
||||
where=where,
|
||||
services=services,
|
||||
folders=folders,
|
||||
nics=nics,
|
||||
network_interfaces=network_interfaces,
|
||||
logon_status=logon_status,
|
||||
num_services_per_node=num_services_per_node,
|
||||
num_folders_per_node=num_folders_per_node,
|
||||
@@ -728,7 +728,7 @@ class AclObservation(AbstractObservation):
|
||||
node_ref = ip_map_config["node_ref"]
|
||||
nic_num = ip_map_config["nic_num"]
|
||||
node_obj = game.simulation.network.nodes[game.ref_map_nodes[node_ref]]
|
||||
nic_obj = node_obj.ethernet_port[nic_num]
|
||||
nic_obj = node_obj.network_interface[nic_num]
|
||||
node_ip_to_idx[nic_obj.ip_address] = ip_idx + 2
|
||||
|
||||
router_hostname = config["router_hostname"]
|
||||
|
||||
@@ -11,11 +11,12 @@ from primaite.game.agent.interface import AbstractAgent, AgentSettings, ProxyAge
|
||||
from primaite.game.agent.observations import ObservationManager
|
||||
from primaite.game.agent.rewards import RewardFunction
|
||||
from primaite.session.io import SessionIO, SessionIOSettings
|
||||
from primaite.simulator.network.hardware.base import NIC, NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.router import Router
|
||||
from primaite.simulator.network.hardware.nodes.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.switch import Switch
|
||||
from primaite.simulator.network.hardware.base import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.sim_container import Simulation
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient
|
||||
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import DataManipulationBot
|
||||
@@ -305,8 +306,8 @@ class PrimaiteGame:
|
||||
if "options" in application_cfg:
|
||||
opt = application_cfg["options"]
|
||||
new_application.target_url = opt.get("target_url")
|
||||
if "nics" in node_cfg:
|
||||
for nic_num, nic_cfg in node_cfg["nics"].items():
|
||||
if "network_interfaces" in node_cfg:
|
||||
for nic_num, nic_cfg in node_cfg["network_interfaces"].items():
|
||||
new_node.connect_nic(NIC(ip_address=nic_cfg["ip_address"], subnet_mask=nic_cfg["subnet_mask"]))
|
||||
|
||||
net.add_node(new_node)
|
||||
@@ -320,11 +321,11 @@ class PrimaiteGame:
|
||||
if isinstance(node_a, Switch):
|
||||
endpoint_a = node_a.switch_ports[link_cfg["endpoint_a_port"]]
|
||||
else:
|
||||
endpoint_a = node_a.ethernet_port[link_cfg["endpoint_a_port"]]
|
||||
endpoint_a = node_a.network_interface[link_cfg["endpoint_a_port"]]
|
||||
if isinstance(node_b, Switch):
|
||||
endpoint_b = node_b.switch_ports[link_cfg["endpoint_b_port"]]
|
||||
else:
|
||||
endpoint_b = node_b.ethernet_port[link_cfg["endpoint_b_port"]]
|
||||
endpoint_b = node_b.network_interface[link_cfg["endpoint_b_port"]]
|
||||
new_link = net.connect(endpoint_a=endpoint_a, endpoint_b=endpoint_b)
|
||||
game.ref_map_links[link_cfg["ref"]] = new_link.uuid
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@
|
||||
" - FILES\n",
|
||||
" - <file_id 1-1>\n",
|
||||
" - health_status\n",
|
||||
" - NICS\n",
|
||||
" - NETWORK_INTERFACES\n",
|
||||
" - <nic_id 1-2>\n",
|
||||
" - nic_status\n",
|
||||
" - operating_status\n",
|
||||
@@ -180,7 +180,7 @@
|
||||
"\n",
|
||||
"The ACL rules in the observation space appear in the same order that they do in the actual ACL. Though, only the first 10 rules are shown, there are default rules lower down that cannot be changed by the agent. The extra rules just allow the network to function normally, by allowing pings, ARP traffic, etc.\n",
|
||||
"\n",
|
||||
"Most nodes have only 1 nic, so the observation for those is placed at NIC index 1 in the observation space. Only the security suite has 2 NICs, the second NIC in the observation space is the one that connects the security suite with swtich_2.\n",
|
||||
"Most nodes have only 1 network_interface, so the observation for those is placed at NIC index 1 in the observation space. Only the security suite has 2 NICs, the second NIC in the observation space is the one that connects the security suite with swtich_2.\n",
|
||||
"\n",
|
||||
"The meaning of the services' operating_state is:\n",
|
||||
"|operating_state|label|\n",
|
||||
@@ -462,37 +462,37 @@
|
||||
" 10: {'PROTOCOLS': {'ALL': 1}}},\n",
|
||||
" 'NODES': {1: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}},\n",
|
||||
" 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 1}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 2: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}},\n",
|
||||
" 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 1}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 3: {'FOLDERS': {1: {'FILES': {1: {'health_status': 1}},\n",
|
||||
" 'health_status': 1}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 4: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}},\n",
|
||||
" 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 5: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}},\n",
|
||||
" 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 6: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}},\n",
|
||||
" 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 7: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}},\n",
|
||||
" 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1}}}\n"
|
||||
]
|
||||
@@ -588,31 +588,31 @@
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{1: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 1}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 2: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 1}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 3: {'FOLDERS': {1: {'FILES': {1: {'health_status': 1}}, 'health_status': 1}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 4: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 5: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 6: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 7: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1}}\n"
|
||||
]
|
||||
@@ -639,31 +639,31 @@
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{1: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 1}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 2: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 3, 'operating_status': 1}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 3: {'FOLDERS': {1: {'FILES': {1: {'health_status': 2}}, 'health_status': 1}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 4: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 5: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 6: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1},\n",
|
||||
" 7: {'FOLDERS': {1: {'FILES': {1: {'health_status': 0}}, 'health_status': 0}},\n",
|
||||
" 'NICS': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'NETWORK_INTERFACES': {1: {'nic_status': 1}, 2: {'nic_status': 0}},\n",
|
||||
" 'SERVICES': {1: {'health_status': 0, 'operating_status': 0}},\n",
|
||||
" 'operating_status': 1}}\n"
|
||||
]
|
||||
|
||||
@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
|
||||
from typing import Callable, ClassVar, Dict, List, Optional, Union
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from primaite import getLogger
|
||||
|
||||
@@ -150,14 +150,12 @@ class SimComponent(BaseModel):
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")
|
||||
"""Configure pydantic to allow arbitrary types and to let the instance have attributes not present in model."""
|
||||
|
||||
uuid: str
|
||||
uuid: str = Field(default_factory=lambda: str(uuid4()))
|
||||
"""The component UUID."""
|
||||
|
||||
_original_state: Dict = {}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if not kwargs.get("uuid"):
|
||||
kwargs["uuid"] = str(uuid4())
|
||||
super().__init__(**kwargs)
|
||||
self._request_manager: RequestManager = self._init_request_manager()
|
||||
self._parent: Optional["SimComponent"] = None
|
||||
|
||||
@@ -7,11 +7,11 @@ from prettytable import MARKDOWN, PrettyTable
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.core import RequestManager, RequestType, SimComponent
|
||||
from primaite.simulator.network.hardware.base import Link, NIC, Node, SwitchPort
|
||||
from primaite.simulator.network.hardware.nodes.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.router import Router
|
||||
from primaite.simulator.network.hardware.nodes.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.switch import Switch
|
||||
from primaite.simulator.network.hardware.base import Link, Node, WiredNetworkInterface
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.services.service import Service
|
||||
|
||||
@@ -62,8 +62,8 @@ class Network(SimComponent):
|
||||
for node in self.nodes.values():
|
||||
node.power_on()
|
||||
|
||||
for nic in node.nics.values():
|
||||
nic.enable()
|
||||
for network_interface in node.network_interfaces.values():
|
||||
network_interface.enable()
|
||||
# Reset software
|
||||
for software in node.software_manager.software.values():
|
||||
if isinstance(software, Service):
|
||||
@@ -148,7 +148,7 @@ class Network(SimComponent):
|
||||
table.title = "IP Addresses"
|
||||
for nodes in nodes_type_map.values():
|
||||
for node in nodes:
|
||||
for i, port in node.ethernet_port.items():
|
||||
for i, port in node.network_interface.items():
|
||||
table.add_row([node.hostname, i, port.ip_address, port.subnet_mask, node.default_gateway])
|
||||
print(table)
|
||||
|
||||
@@ -209,8 +209,8 @@ class Network(SimComponent):
|
||||
node_b = link.endpoint_b._connected_node
|
||||
hostname_a = node_a.hostname if node_a else None
|
||||
hostname_b = node_b.hostname if node_b else None
|
||||
port_a = link.endpoint_a._port_num_on_node
|
||||
port_b = link.endpoint_b._port_num_on_node
|
||||
port_a = link.endpoint_a.port_num
|
||||
port_b = link.endpoint_b.port_num
|
||||
state["links"][uuid] = link.describe_state()
|
||||
state["links"][uuid]["hostname_a"] = hostname_a
|
||||
state["links"][uuid]["hostname_b"] = hostname_b
|
||||
@@ -272,7 +272,7 @@ class Network(SimComponent):
|
||||
self._node_request_manager.remove_request(name=node.uuid)
|
||||
|
||||
def connect(
|
||||
self, endpoint_a: Union[NIC, SwitchPort], endpoint_b: Union[NIC, SwitchPort], **kwargs
|
||||
self, endpoint_a: Union[WiredNetworkInterface], endpoint_b: Union[WiredNetworkInterface], **kwargs
|
||||
) -> Optional[Link]:
|
||||
"""
|
||||
Connect two endpoints on the network by creating a link between their NICs/SwitchPorts.
|
||||
@@ -280,9 +280,9 @@ class Network(SimComponent):
|
||||
.. note:: If the nodes owning the endpoints are not already in the network, they are automatically added.
|
||||
|
||||
:param endpoint_a: The first endpoint to connect.
|
||||
:type endpoint_a: Union[NIC, SwitchPort]
|
||||
:type endpoint_a: WiredNetworkInterface
|
||||
:param endpoint_b: The second endpoint to connect.
|
||||
:type endpoint_b: Union[NIC, SwitchPort]
|
||||
:type endpoint_b: WiredNetworkInterface
|
||||
:raises RuntimeError: If any validation or runtime checks fail.
|
||||
"""
|
||||
node_a: Node = endpoint_a.parent
|
||||
|
||||
@@ -2,9 +2,9 @@ from ipaddress import IPv4Address
|
||||
from typing import Optional
|
||||
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.nodes.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
|
||||
from primaite.simulator.network.hardware.nodes.switch import Switch
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
|
||||
@@ -111,7 +111,7 @@ def create_office_lan(
|
||||
if num_of_switches > 1:
|
||||
network.connect(core_switch.switch_ports[core_switch_port], switch.switch_ports[24])
|
||||
else:
|
||||
network.connect(router.ethernet_ports[1], switch.switch_ports[24])
|
||||
network.connect(router.network_interface[1], switch.switch_ports[24])
|
||||
|
||||
# Add PCs to the LAN and connect them to switches
|
||||
for i in range(1, num_pcs + 1):
|
||||
@@ -127,7 +127,7 @@ def create_office_lan(
|
||||
core_switch_port += 1
|
||||
network.connect(core_switch.switch_ports[core_switch_port], switch.switch_ports[24])
|
||||
else:
|
||||
network.connect(router.ethernet_ports[1], switch.switch_ports[24])
|
||||
network.connect(router.network_interface[1], switch.switch_ports[24])
|
||||
|
||||
# Create and add a PC to the network
|
||||
pc = Computer(
|
||||
@@ -142,7 +142,7 @@ def create_office_lan(
|
||||
|
||||
# Connect the PC to the switch
|
||||
switch_port += 1
|
||||
network.connect(switch.switch_ports[switch_port], pc.ethernet_port[1])
|
||||
network.connect(switch.switch_ports[switch_port], pc.network_interface[1])
|
||||
switch.switch_ports[switch_port].enable()
|
||||
|
||||
return network
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,9 @@
|
||||
from abc import ABC
|
||||
from ipaddress import IPv4Network
|
||||
from typing import Dict
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
from typing import Dict
|
||||
|
||||
from primaite.simulator.network.hardware.base import WirelessNetworkInterface
|
||||
from primaite.simulator.network.hardware.network_interface.layer_3_interface import Layer3Interface
|
||||
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
|
||||
|
||||
class WirelessAccessPoint(WirelessNetworkInterface, Layer3Interface):
|
||||
"""
|
||||
Represents a Wireless Access Point (AP) in a network.
|
||||
|
||||
This class models a Wireless Access Point, a device that allows wireless devices to connect to a wired network
|
||||
using Wi-Fi or other wireless standards. The Wireless Access Point bridges the wireless and wired segments of
|
||||
the network, allowing wireless devices to communicate with other devices on the network.
|
||||
|
||||
As an integral component of wireless networking, a Wireless Access Point provides functionalities for network
|
||||
management, signal broadcasting, security enforcement, and connection handling. It also possesses Layer 3
|
||||
capabilities such as IP addressing and subnetting, allowing for network segmentation and routing.
|
||||
|
||||
Inherits from:
|
||||
- WirelessNetworkInterface: Provides basic properties and methods specific to wireless interfaces.
|
||||
- Layer3Interface: Provides Layer 3 properties like ip_address and subnet_mask, enabling the device to manage
|
||||
network traffic and routing.
|
||||
|
||||
This class can be further specialised or extended to support specific features or standards related to wireless
|
||||
networking, such as different Wi-Fi versions, frequency bands, or advanced security protocols.
|
||||
"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
# Get the state from the WirelessNetworkInterface
|
||||
state = WirelessNetworkInterface.describe_state(self)
|
||||
|
||||
# Update the state with information from Layer3Interface
|
||||
state.update(Layer3Interface.describe_state(self))
|
||||
|
||||
# Update the state with NIC-specific information
|
||||
state.update(
|
||||
{
|
||||
"wake_on_lan": self.wake_on_lan,
|
||||
}
|
||||
)
|
||||
|
||||
return state
|
||||
|
||||
def enable(self):
|
||||
"""Enable the interface."""
|
||||
pass
|
||||
|
||||
def disable(self):
|
||||
"""Disable the interface."""
|
||||
pass
|
||||
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame through the interface.
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: A boolean indicating whether the frame was successfully sent.
|
||||
"""
|
||||
pass
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
String representation of the NIC.
|
||||
|
||||
:return: A string combining the port number, MAC address and IP address of the NIC.
|
||||
"""
|
||||
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
|
||||
@@ -0,0 +1,81 @@
|
||||
from typing import Dict
|
||||
|
||||
from primaite.simulator.network.hardware.base import WirelessNetworkInterface
|
||||
from primaite.simulator.network.hardware.network_interface.layer_3_interface import Layer3Interface
|
||||
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
|
||||
|
||||
class WirelessNIC(WirelessNetworkInterface, Layer3Interface):
|
||||
"""
|
||||
Represents a Wireless Network Interface Card (Wireless NIC) in a network device.
|
||||
|
||||
This class encapsulates the functionalities and attributes of a wireless NIC, combining the characteristics of a
|
||||
wireless network interface with Layer 3 features. It is capable of connecting to wireless networks, managing
|
||||
wireless-specific properties such as signal strength and security protocols, and also handling IP-related
|
||||
functionalities like IP addressing and subnetting.
|
||||
|
||||
Inherits from:
|
||||
- WirelessNetworkInterface: Provides basic properties and methods specific to wireless interfaces.
|
||||
- Layer3Interface: Provides Layer 3 properties like ip_address and subnet_mask, enabling the device to participate
|
||||
in IP-based networking.
|
||||
|
||||
This class can be extended to include more advanced features or to tailor its behavior for specific types of
|
||||
wireless networks or protocols.
|
||||
"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
# Get the state from the WirelessNetworkInterface
|
||||
state = WirelessNetworkInterface.describe_state(self)
|
||||
|
||||
# Update the state with information from Layer3Interface
|
||||
state.update(Layer3Interface.describe_state(self))
|
||||
|
||||
# Update the state with NIC-specific information
|
||||
state.update(
|
||||
{
|
||||
"wake_on_lan": self.wake_on_lan,
|
||||
}
|
||||
)
|
||||
|
||||
return state
|
||||
|
||||
def enable(self):
|
||||
"""Enable the interface."""
|
||||
pass
|
||||
|
||||
def disable(self):
|
||||
"""Disable the interface."""
|
||||
pass
|
||||
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame through the interface.
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: A boolean indicating whether the frame was successfully sent.
|
||||
"""
|
||||
pass
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
pass
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
String representation of the NIC.
|
||||
|
||||
:return: A string combining the port number, MAC address and IP address of the NIC.
|
||||
"""
|
||||
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
|
||||
@@ -1,63 +0,0 @@
|
||||
from primaite.simulator.network.hardware.base import NIC, Node
|
||||
from primaite.simulator.system.applications.web_browser import WebBrowser
|
||||
from primaite.simulator.system.services.arp.host_arp import HostARP
|
||||
from primaite.simulator.system.services.dns.dns_client import DNSClient
|
||||
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
|
||||
from primaite.simulator.system.services.icmp.icmp import ICMP
|
||||
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
|
||||
|
||||
|
||||
class Host(Node):
|
||||
"""
|
||||
A basic Host class.
|
||||
|
||||
Example:
|
||||
>>> pc_a = Host(
|
||||
hostname="pc_a",
|
||||
ip_address="192.168.1.10",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="192.168.1.1"
|
||||
)
|
||||
>>> pc_a.power_on()
|
||||
|
||||
Instances of computer come 'pre-packaged' with the following:
|
||||
|
||||
* Core Functionality:
|
||||
* Packet Capture
|
||||
* Sys Log
|
||||
* Services:
|
||||
* ARP Service
|
||||
* ICMP Service
|
||||
* DNS Client
|
||||
* FTP Client
|
||||
* NTP Client
|
||||
* Applications:
|
||||
* Web Browser
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.connect_nic(NIC(ip_address=kwargs["ip_address"], subnet_mask=kwargs["subnet_mask"]))
|
||||
self._install_system_software()
|
||||
|
||||
def _install_system_software(self):
|
||||
"""Install System Software - software that is usually provided with the OS."""
|
||||
# ARP Service
|
||||
self.software_manager.install(HostARP)
|
||||
|
||||
# ICMP Service
|
||||
self.software_manager.install(ICMP)
|
||||
|
||||
# DNS Client
|
||||
self.software_manager.install(DNSClient)
|
||||
|
||||
# FTP Client
|
||||
self.software_manager.install(FTPClient)
|
||||
|
||||
# NTP Client
|
||||
self.software_manager.install(NTPClient)
|
||||
|
||||
# Web Browser
|
||||
self.software_manager.install(WebBrowser)
|
||||
|
||||
super()._install_system_software()
|
||||
@@ -1,11 +1,7 @@
|
||||
from primaite.simulator.network.hardware.base import NIC, Node
|
||||
from primaite.simulator.network.hardware.nodes.host import Host
|
||||
from primaite.simulator.system.applications.web_browser import WebBrowser
|
||||
from primaite.simulator.system.services.dns.dns_client import DNSClient
|
||||
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
|
||||
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
|
||||
|
||||
|
||||
class Computer(Host):
|
||||
class Computer(HostNode):
|
||||
"""
|
||||
A basic Computer class.
|
||||
|
||||
354
src/primaite/simulator/network/hardware/nodes/host/host_node.py
Normal file
354
src/primaite/simulator/network/hardware/nodes/host/host_node.py
Normal file
@@ -0,0 +1,354 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.network.hardware.base import IPWiredNetworkInterface
|
||||
from primaite.simulator.network.hardware.base import Node
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.system.applications.web_browser import WebBrowser
|
||||
from primaite.simulator.system.core.packet_capture import PacketCapture
|
||||
from primaite.simulator.system.services.arp.arp import ARP, ARPPacket
|
||||
from primaite.simulator.system.services.dns.dns_client import DNSClient
|
||||
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
|
||||
from primaite.simulator.system.services.icmp.icmp import ICMP
|
||||
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
# Lives here due to pydantic circular dependency issue :(
|
||||
class HostARP(ARP):
|
||||
"""
|
||||
The Host ARP Service.
|
||||
|
||||
Extends the ARP service with functionalities specific to a host within the network. It provides mechanisms to
|
||||
resolve and cache MAC addresses and NICs for given IP addresses, focusing on the host's perspective, including
|
||||
handling the default gateway.
|
||||
"""
|
||||
|
||||
def get_default_gateway_mac_address(self) -> Optional[str]:
|
||||
"""
|
||||
Retrieves the MAC address of the default gateway from the ARP cache.
|
||||
|
||||
:return: The MAC address of the default gateway if it exists in the ARP cache, otherwise None.
|
||||
"""
|
||||
if self.software_manager.node.default_gateway:
|
||||
return self.get_arp_cache_mac_address(self.software_manager.node.default_gateway)
|
||||
|
||||
def get_default_gateway_network_interface(self) -> Optional[NIC]:
|
||||
"""
|
||||
Retrieves the NIC associated with the default gateway from the ARP cache.
|
||||
|
||||
:return: The NIC associated with the default gateway if it exists in the ARP cache, otherwise None.
|
||||
"""
|
||||
if self.software_manager.node.default_gateway:
|
||||
return self.get_arp_cache_network_interface(self.software_manager.node.default_gateway)
|
||||
|
||||
def _get_arp_cache_mac_address(
|
||||
self, ip_address: IPV4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Internal method to retrieve the MAC address associated with an IP address from the ARP cache.
|
||||
|
||||
:param ip_address: The IP address whose MAC address is to be retrieved.
|
||||
:param is_reattempt: Indicates if this call is a reattempt after a failed initial attempt.
|
||||
:param is_default_gateway_attempt: Indicates if this call is an attempt to get the default gateway's MAC address.
|
||||
:return: The MAC address associated with the IP address if found, otherwise None.
|
||||
"""
|
||||
arp_entry = self.arp.get(ip_address)
|
||||
|
||||
if arp_entry:
|
||||
return arp_entry.mac_address
|
||||
else:
|
||||
if not is_reattempt:
|
||||
self.send_arp_request(ip_address)
|
||||
return self._get_arp_cache_mac_address(
|
||||
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
|
||||
)
|
||||
else:
|
||||
if self.software_manager.node.default_gateway:
|
||||
if not is_default_gateway_attempt:
|
||||
self.send_arp_request(self.software_manager.node.default_gateway)
|
||||
return self._get_arp_cache_mac_address(
|
||||
ip_address=self.software_manager.node.default_gateway, is_reattempt=True,
|
||||
is_default_gateway_attempt=True
|
||||
)
|
||||
return None
|
||||
|
||||
def get_arp_cache_mac_address(self, ip_address: IPV4Address) -> Optional[str]:
|
||||
"""
|
||||
Retrieves the MAC address associated with an IP address from the ARP cache.
|
||||
|
||||
:param ip_address: The IP address whose MAC address is to be retrieved.
|
||||
:return: The MAC address associated with the IP address if found, otherwise None.
|
||||
"""
|
||||
return self._get_arp_cache_mac_address(ip_address)
|
||||
|
||||
def _get_arp_cache_network_interface(
|
||||
self, ip_address: IPV4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
|
||||
) -> Optional[NIC]:
|
||||
"""
|
||||
Internal method to retrieve the NIC associated with an IP address from the ARP cache.
|
||||
|
||||
:param ip_address: The IP address whose NIC is to be retrieved.
|
||||
:param is_reattempt: Indicates if this call is a reattempt after a failed initial attempt.
|
||||
:param is_default_gateway_attempt: Indicates if this call is an attempt to get the NIC of the default gateway.
|
||||
:return: The NIC associated with the IP address if found, otherwise None.
|
||||
"""
|
||||
arp_entry = self.arp.get(ip_address)
|
||||
|
||||
if arp_entry:
|
||||
return self.software_manager.node.network_interfaces[arp_entry.network_interface_uuid]
|
||||
else:
|
||||
if not is_reattempt:
|
||||
self.send_arp_request(ip_address)
|
||||
return self._get_arp_cache_network_interface(
|
||||
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
|
||||
)
|
||||
else:
|
||||
if self.software_manager.node.default_gateway:
|
||||
if not is_default_gateway_attempt:
|
||||
self.send_arp_request(self.software_manager.node.default_gateway)
|
||||
return self._get_arp_cache_network_interface(
|
||||
ip_address=self.software_manager.node.default_gateway, is_reattempt=True,
|
||||
is_default_gateway_attempt=True
|
||||
)
|
||||
return None
|
||||
|
||||
def get_arp_cache_network_interface(self, ip_address: IPV4Address) -> Optional[NIC]:
|
||||
"""
|
||||
Retrieves the NIC associated with an IP address from the ARP cache.
|
||||
|
||||
:param ip_address: The IP address whose NIC is to be retrieved.
|
||||
:return: The NIC associated with the IP address if found, otherwise None.
|
||||
"""
|
||||
return self._get_arp_cache_network_interface(ip_address)
|
||||
|
||||
def _process_arp_request(self, arp_packet: ARPPacket, from_network_interface: NIC):
|
||||
"""
|
||||
Processes an ARP request.
|
||||
|
||||
Adds a new entry to the ARP cache if the target IP address matches the NIC's IP address and sends an ARP
|
||||
reply back.
|
||||
|
||||
:param arp_packet: The ARP packet containing the request.
|
||||
:param from_network_interface: The NIC that received the ARP request.
|
||||
"""
|
||||
super()._process_arp_request(arp_packet, from_network_interface)
|
||||
# Unmatched ARP Request
|
||||
if arp_packet.target_ip_address != from_network_interface.ip_address:
|
||||
self.sys_log.info(
|
||||
f"Ignoring ARP request for {arp_packet.target_ip_address}. Current IP address is {from_network_interface.ip_address}"
|
||||
)
|
||||
return
|
||||
|
||||
# Matched ARP request
|
||||
self.add_arp_cache_entry(
|
||||
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr,
|
||||
network_interface=from_network_interface
|
||||
)
|
||||
arp_packet = arp_packet.generate_reply(from_network_interface.mac_address)
|
||||
self.send_arp_reply(arp_packet)
|
||||
|
||||
|
||||
class NIC(IPWiredNetworkInterface):
|
||||
"""
|
||||
Represents a Network Interface Card (NIC) in a Host Node.
|
||||
|
||||
A NIC is a hardware component that provides a computer or other network device with the ability to connect to a
|
||||
network. It operates at both Layer 2 (Data Link Layer) and Layer 3 (Network Layer) of the OSI model, meaning it
|
||||
can interpret both MAC addresses and IP addresses. This class combines the functionalities of
|
||||
WiredNetworkInterface and Layer3Interface, allowing the NIC to manage physical connections and network layer
|
||||
addressing.
|
||||
|
||||
Inherits from:
|
||||
- WiredNetworkInterface: Provides properties and methods specific to wired connections, including methods to connect
|
||||
and disconnect from network links and to manage the enabled/disabled state of the interface.
|
||||
- Layer3Interface: Provides properties for Layer 3 network configuration, such as IP address and subnet mask.
|
||||
"""
|
||||
wake_on_lan: bool = False
|
||||
"Indicates if the NIC supports Wake-on-LAN functionality."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
# Get the state from the IPWiredNetworkInterface
|
||||
state = super().describe_state()
|
||||
|
||||
# Update the state with NIC-specific information
|
||||
state.update(
|
||||
{
|
||||
"wake_on_lan": self.wake_on_lan,
|
||||
}
|
||||
)
|
||||
|
||||
return state
|
||||
|
||||
def set_original_state(self):
|
||||
"""Sets the original state."""
|
||||
vals_to_include = {"ip_address", "subnet_mask", "mac_address", "speed", "mtu", "wake_on_lan", "enabled"}
|
||||
self._original_state = self.model_dump(include=vals_to_include)
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempt to receive and process a network frame from the connected Link.
|
||||
|
||||
This method processes a frame if the NIC is enabled. It checks the frame's destination and TTL, captures the
|
||||
frame using PCAP, and forwards it to the connected Node if valid. Returns True if the frame is processed,
|
||||
False otherwise (e.g., if the NIC is disabled, or TTL expired).
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: True if the frame is processed and passed to the node, False otherwise.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self._connected_node.sys_log.info(f"Frame discarded at {self} as TTL limit reached")
|
||||
return False
|
||||
frame.set_received_timestamp()
|
||||
self.pcap.capture_inbound(frame)
|
||||
# If this destination or is broadcast
|
||||
accept_frame = False
|
||||
|
||||
# Check if it's a broadcast:
|
||||
if frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
|
||||
if frame.ip.dst_ip_address in {self.ip_address, self.ip_network.broadcast_address}:
|
||||
accept_frame = True
|
||||
else:
|
||||
if frame.ethernet.dst_mac_addr == self.mac_address:
|
||||
accept_frame = True
|
||||
|
||||
if accept_frame:
|
||||
self._connected_node.receive_frame(frame=frame, from_network_interface=self)
|
||||
return True
|
||||
return False
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
String representation of the NIC.
|
||||
|
||||
:return: A string combining the port number, MAC address and IP address of the NIC.
|
||||
"""
|
||||
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
|
||||
|
||||
|
||||
class HostNode(Node):
|
||||
"""
|
||||
Represents a host node in the network.
|
||||
|
||||
Extends the basic functionality of a Node with host-specific services and applications. A host node typically
|
||||
represents an end-user device in the network, such as a Computer or a Server, and is capable of initiating and
|
||||
responding to network communications.
|
||||
|
||||
Example:
|
||||
>>> pc_a = HostNode(
|
||||
hostname="pc_a",
|
||||
ip_address="192.168.1.10",
|
||||
subnet_mask="255.255.255.0",
|
||||
default_gateway="192.168.1.1"
|
||||
)
|
||||
>>> pc_a.power_on()
|
||||
|
||||
The host comes pre-installed with core functionalities and a suite of services and applications, making it ready
|
||||
for various network operations and tasks. These include:
|
||||
|
||||
Core Functionality:
|
||||
-------------------
|
||||
|
||||
* Packet Capture: Monitors and logs network traffic.
|
||||
* Sys Log: Logs system events and errors.
|
||||
|
||||
Services:
|
||||
---------
|
||||
|
||||
* ARP (Address Resolution Protocol) Service: Resolves IP addresses to MAC addresses.
|
||||
* ICMP (Internet Control Message Protocol) Service: Handles ICMP operations, such as ping requests.
|
||||
* DNS (Domain Name System) Client: Resolves domain names to IP addresses.
|
||||
* FTP (File Transfer Protocol) Client: Enables file transfers between the host and FTP servers.
|
||||
* NTP (Network Time Protocol) Client: Synchronizes the system clock with NTP servers.
|
||||
|
||||
Applications:
|
||||
------------
|
||||
|
||||
* Web Browser: Provides web browsing capabilities.
|
||||
"""
|
||||
network_interfaces: Dict[str, NIC] = {}
|
||||
"The Network Interfaces on the node."
|
||||
network_interface: Dict[int, NIC] = {}
|
||||
"The NICs on the node by port id."
|
||||
|
||||
def __init__(self, ip_address: IPV4Address, subnet_mask: IPV4Address, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.connect_nic(NIC(ip_address=ip_address, subnet_mask=subnet_mask))
|
||||
|
||||
def _install_system_software(self):
|
||||
"""Install System Software - software that is usually provided with the OS."""
|
||||
# ARP Service
|
||||
self.software_manager.install(HostARP)
|
||||
|
||||
# ICMP Service
|
||||
self.software_manager.install(ICMP)
|
||||
|
||||
# DNS Client
|
||||
self.software_manager.install(DNSClient)
|
||||
|
||||
# FTP Client
|
||||
self.software_manager.install(FTPClient)
|
||||
|
||||
# NTP Client
|
||||
self.software_manager.install(NTPClient)
|
||||
|
||||
# Web Browser
|
||||
self.software_manager.install(WebBrowser)
|
||||
|
||||
super()._install_system_software()
|
||||
|
||||
def default_gateway_hello(self):
|
||||
if self.operating_state == NodeOperatingState.ON and self.default_gateway:
|
||||
self.software_manager.arp.get_default_gateway_mac_address()
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: NIC):
|
||||
"""
|
||||
Receive a Frame from the connected NIC and process it.
|
||||
|
||||
Depending on the protocol, the frame is passed to the appropriate handler such as ARP or ICMP, or up to the
|
||||
SessionManager if no code manager exists.
|
||||
|
||||
:param frame: The Frame being received.
|
||||
:param from_network_interface: The NIC that received the frame.
|
||||
"""
|
||||
super().receive_frame(frame, from_network_interface)
|
||||
|
||||
# Check if the destination port is open on the Node
|
||||
dst_port = None
|
||||
if frame.tcp:
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.udp:
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
accept_frame = False
|
||||
if frame.icmp or dst_port in self.software_manager.get_open_ports():
|
||||
# accept the frame as the port is open or if it's an ICMP frame
|
||||
accept_frame = True
|
||||
|
||||
# TODO: add internal node firewall check here?
|
||||
|
||||
if accept_frame:
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
# denied as port closed
|
||||
self.sys_log.info(f"Ignoring frame for port {frame.tcp.dst_port.value} from {frame.ip.src_ip_address}")
|
||||
# TODO: do we need to do anything more here?
|
||||
pass
|
||||
@@ -1,7 +1,7 @@
|
||||
from primaite.simulator.network.hardware.nodes.host import Host
|
||||
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
|
||||
|
||||
|
||||
class Server(Host):
|
||||
class Server(HostNode):
|
||||
"""
|
||||
A basic Server class.
|
||||
|
||||
@@ -28,4 +28,4 @@ class Server(Host):
|
||||
* Applications:
|
||||
* Web Browser
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
from primaite.simulator.network.hardware.base import Node, NetworkInterface
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
|
||||
|
||||
class NetworkNode(Node):
|
||||
""""""
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: NetworkInterface):
|
||||
pass
|
||||
@@ -1,19 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
from enum import Enum
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
from typing import Dict, Any
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
|
||||
from primaite.simulator.core import RequestManager, RequestType, SimComponent
|
||||
from primaite.simulator.network.hardware.base import NIC, Node
|
||||
from primaite.simulator.network.hardware.base import IPWiredNetworkInterface
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
|
||||
from primaite.simulator.network.hardware.nodes.network.network_node import NetworkNode
|
||||
from primaite.simulator.network.protocols.arp import ARPPacket
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.services.arp.arp import ARP
|
||||
from primaite.simulator.system.services.icmp.icmp import ICMP
|
||||
|
||||
|
||||
class ACLAction(Enum):
|
||||
@@ -197,14 +201,14 @@ class AccessControlList(SimComponent):
|
||||
return self._acl
|
||||
|
||||
def add_rule(
|
||||
self,
|
||||
action: ACLAction,
|
||||
protocol: Optional[IPProtocol] = None,
|
||||
src_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
src_port: Optional[Port] = None,
|
||||
dst_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
dst_port: Optional[Port] = None,
|
||||
position: int = 0,
|
||||
self,
|
||||
action: ACLAction,
|
||||
protocol: Optional[IPProtocol] = None,
|
||||
src_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
src_port: Optional[Port] = None,
|
||||
dst_ip_address: Optional[Union[str, IPv4Address]] = None,
|
||||
dst_port: Optional[Port] = None,
|
||||
position: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Add a new ACL rule.
|
||||
@@ -251,12 +255,12 @@ class AccessControlList(SimComponent):
|
||||
raise ValueError(f"Cannot remove ACL rule, position {position} is out of bounds.")
|
||||
|
||||
def is_permitted(
|
||||
self,
|
||||
protocol: IPProtocol,
|
||||
src_ip_address: Union[str, IPv4Address],
|
||||
src_port: Optional[Port],
|
||||
dst_ip_address: Union[str, IPv4Address],
|
||||
dst_port: Optional[Port],
|
||||
self,
|
||||
protocol: IPProtocol,
|
||||
src_ip_address: Union[str, IPv4Address],
|
||||
src_port: Optional[Port],
|
||||
dst_ip_address: Union[str, IPv4Address],
|
||||
dst_port: Optional[Port],
|
||||
) -> Tuple[bool, Optional[Union[str, ACLRule]]]:
|
||||
"""
|
||||
Check if a packet with the given properties is permitted through the ACL.
|
||||
@@ -278,23 +282,23 @@ class AccessControlList(SimComponent):
|
||||
continue
|
||||
|
||||
if (
|
||||
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
|
||||
and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
|
||||
and (rule.protocol == protocol or rule.protocol is None)
|
||||
and (rule.src_port == src_port or rule.src_port is None)
|
||||
and (rule.dst_port == dst_port or rule.dst_port is None)
|
||||
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
|
||||
and (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
|
||||
and (rule.protocol == protocol or rule.protocol is None)
|
||||
and (rule.src_port == src_port or rule.src_port is None)
|
||||
and (rule.dst_port == dst_port or rule.dst_port is None)
|
||||
):
|
||||
return rule.action == ACLAction.PERMIT, rule
|
||||
|
||||
return self.implicit_action == ACLAction.PERMIT, f"Implicit {self.implicit_action.name}"
|
||||
|
||||
def get_relevant_rules(
|
||||
self,
|
||||
protocol: IPProtocol,
|
||||
src_ip_address: Union[str, IPv4Address],
|
||||
src_port: Port,
|
||||
dst_ip_address: Union[str, IPv4Address],
|
||||
dst_port: Port,
|
||||
self,
|
||||
protocol: IPProtocol,
|
||||
src_ip_address: Union[str, IPv4Address],
|
||||
src_port: Port,
|
||||
dst_ip_address: Union[str, IPv4Address],
|
||||
dst_port: Port,
|
||||
) -> List[ACLRule]:
|
||||
"""
|
||||
Get the list of relevant rules for a packet with given properties.
|
||||
@@ -316,11 +320,11 @@ class AccessControlList(SimComponent):
|
||||
continue
|
||||
|
||||
if (
|
||||
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
|
||||
or (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
|
||||
or (rule.protocol == protocol or rule.protocol is None)
|
||||
or (rule.src_port == src_port or rule.src_port is None)
|
||||
or (rule.dst_port == dst_port or rule.dst_port is None)
|
||||
(rule.src_ip_address == src_ip_address or rule.src_ip_address is None)
|
||||
or (rule.dst_ip_address == dst_ip_address or rule.dst_ip_address is None)
|
||||
or (rule.protocol == protocol or rule.protocol is None)
|
||||
or (rule.src_port == src_port or rule.src_port is None)
|
||||
or (rule.dst_port == dst_port or rule.dst_port is None)
|
||||
):
|
||||
relevant_rules.append(rule)
|
||||
|
||||
@@ -437,11 +441,11 @@ class RouteTable(SimComponent):
|
||||
pass
|
||||
|
||||
def add_route(
|
||||
self,
|
||||
address: Union[IPv4Address, str],
|
||||
subnet_mask: Union[IPv4Address, str],
|
||||
next_hop_ip_address: Union[IPv4Address, str],
|
||||
metric: float = 0.0,
|
||||
self,
|
||||
address: Union[IPv4Address, str],
|
||||
subnet_mask: Union[IPv4Address, str],
|
||||
next_hop_ip_address: Union[IPv4Address, str],
|
||||
metric: float = 0.0,
|
||||
):
|
||||
"""
|
||||
Add a route to the routing table.
|
||||
@@ -528,7 +532,79 @@ class RouteTable(SimComponent):
|
||||
table.add_row([index, f"{route.address}/{network.prefixlen}", route.next_hop_ip_address, route.metric])
|
||||
print(table)
|
||||
|
||||
class RouterNIC(NIC):
|
||||
|
||||
class RouterARP(ARP):
|
||||
"""
|
||||
Inherits from ARPCache and adds router-specific ARP packet processing.
|
||||
|
||||
:ivar SysLog sys_log: A system log for logging messages.
|
||||
:ivar Router router: The router to which this ARP cache belongs.
|
||||
"""
|
||||
router: Optional[Router] = None
|
||||
|
||||
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
|
||||
arp_entry = self.arp.get(ip_address)
|
||||
|
||||
if arp_entry:
|
||||
return arp_entry.mac_address
|
||||
return None
|
||||
|
||||
def get_arp_cache_network_interface(self, ip_address: IPv4Address) -> Optional[RouterInterface]:
|
||||
|
||||
arp_entry = self.arp.get(ip_address)
|
||||
if arp_entry:
|
||||
return self.software_manager.node.network_interfaces[arp_entry.network_interface_uuid]
|
||||
for network_interface in self.router.network_interfaces.values():
|
||||
if ip_address in network_interface.ip_network:
|
||||
return network_interface
|
||||
return None
|
||||
|
||||
def _process_arp_request(self, arp_packet: ARPPacket, from_network_interface: RouterInterface):
|
||||
super()._process_arp_request(arp_packet, from_network_interface)
|
||||
|
||||
# If the target IP matches one of the router's NICs
|
||||
for network_interface in self.router.network_interfaces.values():
|
||||
if network_interface.enabled and network_interface.ip_address == arp_packet.target_ip_address:
|
||||
arp_reply = arp_packet.generate_reply(from_network_interface.mac_address)
|
||||
self.send_arp_reply(arp_reply)
|
||||
return
|
||||
|
||||
def _process_arp_reply(self, arp_packet: ARPPacket, from_network_interface: RouterInterface):
|
||||
if arp_packet.target_ip_address == from_network_interface.ip_address:
|
||||
super()._process_arp_reply(arp_packet, from_network_interface)
|
||||
|
||||
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
|
||||
"""
|
||||
Processes received data, handling ARP packets.
|
||||
|
||||
:param payload: The payload received.
|
||||
:param session_id: The session ID associated with the received data.
|
||||
:param kwargs: Additional keyword arguments.
|
||||
:return: True if the payload was processed successfully, otherwise False.
|
||||
"""
|
||||
if not super().receive(payload, session_id, **kwargs):
|
||||
return False
|
||||
|
||||
arp_packet: ARPPacket = payload
|
||||
from_network_interface: RouterInterface = kwargs["from_network_interface"]
|
||||
|
||||
for network_interface in self.router.network_interfaces.values():
|
||||
# ARP frame is for this Router
|
||||
if network_interface.ip_address == arp_packet.target_ip_address:
|
||||
if payload.request:
|
||||
self._process_arp_request(arp_packet=arp_packet, from_network_interface=from_network_interface)
|
||||
else:
|
||||
self._process_arp_reply(arp_packet=arp_packet, from_network_interface=from_network_interface)
|
||||
return True
|
||||
|
||||
# ARP frame is not for this router, pass back down to Router to continue routing
|
||||
frame: Frame = kwargs["frame"]
|
||||
self.router.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class RouterNIC(IPWiredNetworkInterface):
|
||||
"""
|
||||
A Router-specific Network Interface Card (NIC) that extends the standard NIC functionality.
|
||||
|
||||
@@ -561,7 +637,7 @@ class RouterNIC(NIC):
|
||||
self.pcap.capture_inbound(frame)
|
||||
# If this destination or is broadcast
|
||||
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
|
||||
self._connected_node.receive_frame(frame=frame, from_nic=self)
|
||||
self._connected_node.receive_frame(frame=frame, from_network_interface=self)
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -569,21 +645,63 @@ class RouterNIC(NIC):
|
||||
return f"{self.mac_address}/{self.ip_address}"
|
||||
|
||||
|
||||
class Router(Node):
|
||||
class RouterInterface(IPWiredNetworkInterface):
|
||||
"""
|
||||
Represents a Router Interface.
|
||||
|
||||
Router interfaces are used to connect routers to networks. They can route packets across different networks,
|
||||
hence have IP addressing information.
|
||||
|
||||
Inherits from:
|
||||
- WiredNetworkInterface: Provides properties and methods specific to wired connections.
|
||||
- Layer3Interface: Provides Layer 3 properties like ip_address and subnet_mask.
|
||||
"""
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self._connected_node.sys_log.info("Frame discarded as TTL limit reached")
|
||||
return False
|
||||
frame.set_received_timestamp()
|
||||
self.pcap.capture_inbound(frame)
|
||||
# If this destination or is broadcast
|
||||
if frame.ethernet.dst_mac_addr == self.mac_address or frame.ethernet.dst_mac_addr == "ff:ff:ff:ff:ff:ff":
|
||||
self._connected_node.receive_frame(frame=frame, from_network_interface=self)
|
||||
return True
|
||||
return False
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
String representation of the NIC.
|
||||
|
||||
:return: A string combining the port number, MAC address and IP address of the NIC.
|
||||
"""
|
||||
return f"Port {self.port_num}: {self.mac_address}/{self.ip_address}"
|
||||
|
||||
|
||||
class Router(NetworkNode):
|
||||
"""
|
||||
A class to represent a network router node.
|
||||
|
||||
:ivar str hostname: The name of the router node.
|
||||
:ivar int num_ports: The number of ports in the router.
|
||||
:ivar dict kwargs: Optional keyword arguments for SysLog, ACL, RouteTable, RouterARPCache, RouterICMP.
|
||||
:ivar dict kwargs: Optional keyword arguments for SysLog, ACL, RouteTable, RouterARP, RouterICMP.
|
||||
"""
|
||||
|
||||
num_ports: int
|
||||
ethernet_ports: Dict[int, RouterNIC] = {}
|
||||
network_interfaces: Dict[str, RouterInterface] = {}
|
||||
"The Router Interfaces on the node."
|
||||
network_interface: Dict[int, RouterInterface] = {}
|
||||
"The Router Interfaceson the node by port id."
|
||||
acl: AccessControlList
|
||||
route_table: RouteTable
|
||||
# arp: RouterARPCache
|
||||
# icmp: RouterICMP
|
||||
|
||||
def __init__(self, hostname: str, num_ports: int = 5, **kwargs):
|
||||
if not kwargs.get("sys_log"):
|
||||
@@ -592,23 +710,28 @@ class Router(Node):
|
||||
kwargs["acl"] = AccessControlList(sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY)
|
||||
if not kwargs.get("route_table"):
|
||||
kwargs["route_table"] = RouteTable(sys_log=kwargs["sys_log"])
|
||||
# if not kwargs.get("arp"):
|
||||
# kwargs["arp"] = RouterARPCache(sys_log=kwargs.get("sys_log"), router=self)
|
||||
# if not kwargs.get("icmp"):
|
||||
# kwargs["icmp"] = RouterICMP(sys_log=kwargs.get("sys_log"), arp_cache=kwargs.get("arp"), router=self)
|
||||
super().__init__(hostname=hostname, num_ports=num_ports, **kwargs)
|
||||
# TODO: Install RouterICMP
|
||||
# TODO: Install RouterARP
|
||||
for i in range(1, self.num_ports + 1):
|
||||
nic = RouterNIC(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0")
|
||||
self.connect_nic(nic)
|
||||
self.ethernet_ports[i] = nic
|
||||
network_interface = RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0")
|
||||
self.connect_nic(network_interface)
|
||||
self.network_interface[i] = network_interface
|
||||
|
||||
self.arp.nics = self.nics
|
||||
self.icmp.arp = self.arp
|
||||
self._set_default_acl()
|
||||
|
||||
self.set_original_state()
|
||||
|
||||
|
||||
def _install_system_software(self):
|
||||
"""Install System Software - software that is usually provided with the OS."""
|
||||
self.software_manager.install(ICMP)
|
||||
self.software_manager.install(RouterARP)
|
||||
arp: RouterARP = self.software_manager.arp # noqa
|
||||
arp.router = self
|
||||
|
||||
def _set_default_acl(self):
|
||||
self.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
|
||||
self.acl.add_rule(action=ACLAction.PERMIT, protocol=IPProtocol.ICMP, position=23)
|
||||
|
||||
def set_original_state(self):
|
||||
"""Sets the original state."""
|
||||
self.acl.set_original_state()
|
||||
@@ -619,11 +742,11 @@ class Router(Node):
|
||||
|
||||
def reset_component_for_episode(self, episode: int):
|
||||
"""Reset the original state of the SimComponent."""
|
||||
self.arp.clear()
|
||||
self.software_manager.arp.clear()
|
||||
self.acl.reset_component_for_episode(episode)
|
||||
self.route_table.reset_component_for_episode(episode)
|
||||
for i, nic in self.ethernet_ports.items():
|
||||
nic.reset_component_for_episode(episode)
|
||||
for i, network_interface in self.network_interface.items():
|
||||
network_interface.reset_component_for_episode(episode)
|
||||
self.enable_port(i)
|
||||
|
||||
super().reset_component_for_episode(episode)
|
||||
@@ -633,15 +756,15 @@ class Router(Node):
|
||||
rm.add_request("acl", RequestType(func=self.acl._request_manager))
|
||||
return rm
|
||||
|
||||
def _get_port_of_nic(self, target_nic: NIC) -> Optional[int]:
|
||||
def _get_port_of_nic(self, target_nic: RouterInterface) -> Optional[int]:
|
||||
"""
|
||||
Retrieve the port number for a given NIC.
|
||||
|
||||
:param target_nic: Target network interface.
|
||||
:return: The port number if NIC is found, otherwise None.
|
||||
"""
|
||||
for port, nic in self.ethernet_ports.items():
|
||||
if nic == target_nic:
|
||||
for port, network_interface in self.network_interface.items():
|
||||
if network_interface == target_nic:
|
||||
return port
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
@@ -655,83 +778,98 @@ class Router(Node):
|
||||
state["acl"] = self.acl.describe_state()
|
||||
return state
|
||||
|
||||
def process_frame(self, frame: Frame, from_nic: NIC, re_attempt: bool = False) -> None:
|
||||
def process_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
"""
|
||||
Process a Frame.
|
||||
|
||||
:param frame: The frame to be routed.
|
||||
:param from_nic: The source network interface.
|
||||
:param re_attempt: Flag to indicate if the routing is a reattempt.
|
||||
:param from_network_interface: The source network interface.
|
||||
"""
|
||||
# Check if src ip is on network of one of the NICs
|
||||
nic = self.arp.get_arp_cache_nic(frame.ip.dst_ip_address)
|
||||
target_mac = self.arp.get_arp_cache_mac_address(frame.ip.dst_ip_address)
|
||||
# check if frame is addressed to this Router but has failed to be received by a service of application at the
|
||||
# receive_frame stage
|
||||
if frame.ip:
|
||||
for network_interface in self.network_interfaces.values():
|
||||
if network_interface.ip_address == frame.ip.dst_ip_address:
|
||||
self.sys_log.info(f"Dropping frame destined for this router on an port that isn't open.")
|
||||
return
|
||||
|
||||
if re_attempt and not nic:
|
||||
network_interface: RouterInterface = self.software_manager.arp.get_arp_cache_network_interface(
|
||||
frame.ip.dst_ip_address
|
||||
)
|
||||
target_mac = self.software_manager.arp.get_arp_cache_mac_address(frame.ip.dst_ip_address)
|
||||
self.software_manager.arp.show()
|
||||
|
||||
if not network_interface:
|
||||
self.sys_log.info(f"Destination {frame.ip.dst_ip_address} is unreachable")
|
||||
# TODO: Send something back to src, is it some sort of ICMP?
|
||||
return
|
||||
|
||||
if not nic:
|
||||
self.arp.send_arp_request(
|
||||
frame.ip.dst_ip_address, ignore_networks=[frame.ip.src_ip_address, from_nic.ip_address]
|
||||
)
|
||||
return self.process_frame(frame=frame, from_nic=from_nic, re_attempt=True)
|
||||
|
||||
if not nic.enabled:
|
||||
self.sys_log.info(f"Frame dropped as NIC {nic} is not enabled")
|
||||
if not network_interface.enabled:
|
||||
self.sys_log.info(f"Frame dropped as NIC {network_interface} is not enabled")
|
||||
# TODO: Send something back to src, is it some sort of ICMP?
|
||||
return
|
||||
|
||||
if frame.ip.dst_ip_address in nic.ip_network:
|
||||
from_port = self._get_port_of_nic(from_nic)
|
||||
to_port = self._get_port_of_nic(nic)
|
||||
if frame.ip.dst_ip_address in network_interface.ip_network:
|
||||
from_port = self._get_port_of_nic(from_network_interface)
|
||||
to_port = self._get_port_of_nic(network_interface)
|
||||
self.sys_log.info(f"Forwarding frame to internally from port {from_port} to port {to_port}")
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self.sys_log.info("Frame discarded as TTL limit reached")
|
||||
# TODO: Send something back to src, is it some sort of ICMP?
|
||||
return
|
||||
frame.ethernet.src_mac_addr = nic.mac_address
|
||||
frame.ethernet.src_mac_addr = network_interface.mac_address
|
||||
frame.ethernet.dst_mac_addr = target_mac
|
||||
nic.send_frame(frame)
|
||||
network_interface.send_frame(frame)
|
||||
return
|
||||
else:
|
||||
self._route_frame(frame, from_nic)
|
||||
self.route_frame(frame, from_network_interface)
|
||||
|
||||
def _route_frame(self, frame: Frame, from_nic: NIC, re_attempt: bool = False) -> None:
|
||||
def route_frame(self, frame: Frame, from_network_interface: RouterInterface) -> None:
|
||||
route = self.route_table.find_best_route(frame.ip.dst_ip_address)
|
||||
if route:
|
||||
nic = self.arp.get_arp_cache_nic(route.next_hop_ip_address)
|
||||
target_mac = self.arp.get_arp_cache_mac_address(route.next_hop_ip_address)
|
||||
if re_attempt and not nic:
|
||||
network_interface = self.software_managerarp.get_arp_cache_network_interface(route.next_hop_ip_address)
|
||||
target_mac = self.software_manager.arp.get_arp_cache_mac_address(route.next_hop_ip_address)
|
||||
if not network_interface:
|
||||
self.sys_log.info(f"Destination {frame.ip.dst_ip_address} is unreachable")
|
||||
# TODO: Send something back to src, is it some sort of ICMP?
|
||||
return
|
||||
|
||||
if not nic:
|
||||
self.arp.send_arp_request(frame.ip.dst_ip_address, ignore_networks=[frame.ip.src_ip_address])
|
||||
return self.process_frame(frame=frame, from_nic=from_nic, re_attempt=True)
|
||||
|
||||
if not nic.enabled:
|
||||
self.sys_log.info(f"Frame dropped as NIC {nic} is not enabled")
|
||||
if not network_interface.enabled:
|
||||
self.sys_log.info(f"Frame dropped as NIC {network_interface} is not enabled")
|
||||
# TODO: Send something back to src, is it some sort of ICMP?
|
||||
return
|
||||
|
||||
from_port = self._get_port_of_nic(from_nic)
|
||||
to_port = self._get_port_of_nic(nic)
|
||||
from_port = self._get_port_of_nic(from_network_interface)
|
||||
to_port = self._get_port_of_nic(network_interface)
|
||||
self.sys_log.info(f"Routing frame to internally from port {from_port} to port {to_port}")
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self.sys_log.info("Frame discarded as TTL limit reached")
|
||||
# TODO: Send something back to src, is it some sort of ICMP?
|
||||
return
|
||||
frame.ethernet.src_mac_addr = nic.mac_address
|
||||
frame.ethernet.src_mac_addr = network_interface.mac_address
|
||||
frame.ethernet.dst_mac_addr = target_mac
|
||||
nic.send_frame(frame)
|
||||
network_interface.send_frame(frame)
|
||||
|
||||
def receive_frame(self, frame: Frame, from_nic: NIC):
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
Receive a frame from a NIC and processes it based on its protocol.
|
||||
Receive a frame from a RouterInterface and processes it based on its protocol.
|
||||
|
||||
:param frame: The incoming frame.
|
||||
:param from_nic: The network interface where the frame is coming from.
|
||||
:param from_network_interface: The network interface where the frame is coming from.
|
||||
"""
|
||||
process_frame = False
|
||||
|
||||
if self.operating_state != NodeOperatingState.ON:
|
||||
return
|
||||
|
||||
if frame.ip and self.software_manager.arp:
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=frame.ip.src_ip_address,
|
||||
mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface
|
||||
)
|
||||
|
||||
protocol = frame.ip.protocol
|
||||
src_ip_address = frame.ip.src_ip_address
|
||||
dst_ip_address = frame.ip.dst_ip_address
|
||||
@@ -754,21 +892,32 @@ class Router(Node):
|
||||
)
|
||||
|
||||
if not permitted:
|
||||
at_port = self._get_port_of_nic(from_nic)
|
||||
at_port = self._get_port_of_nic(from_network_interface)
|
||||
self.sys_log.info(f"Frame blocked at port {at_port} by rule {rule}")
|
||||
return
|
||||
self.arp.add_arp_cache_entry(src_ip_address, frame.ethernet.src_mac_addr, from_nic)
|
||||
if frame.ip.protocol == IPProtocol.ICMP:
|
||||
self.icmp.process_icmp(frame=frame, from_nic=from_nic)
|
||||
|
||||
self.software_manager.arp.add_arp_cache_entry(
|
||||
ip_address=src_ip_address, mac_address=frame.ethernet.src_mac_addr,
|
||||
network_interface=from_network_interface
|
||||
)
|
||||
|
||||
# Check if the destination port is open on the Node
|
||||
dst_port = None
|
||||
if frame.tcp:
|
||||
dst_port = frame.tcp.dst_port
|
||||
elif frame.udp:
|
||||
dst_port = frame.udp.dst_port
|
||||
|
||||
send_to_session_manager = False
|
||||
if ((frame.icmp and dst_ip_address == from_network_interface.ip_address)
|
||||
or (dst_port in self.software_manager.get_open_ports())):
|
||||
send_to_session_manager = True
|
||||
|
||||
if send_to_session_manager:
|
||||
# Port is open on this Router so pass Frame up to session manager first
|
||||
self.session_manager.receive_frame(frame, from_network_interface)
|
||||
else:
|
||||
if src_port == Port.ARP:
|
||||
self.arp.process_arp_packet(from_nic=from_nic, frame=frame, route_table=self.route_table)
|
||||
return
|
||||
else:
|
||||
# All other traffic
|
||||
process_frame = True
|
||||
if process_frame:
|
||||
self.process_frame(frame, from_nic)
|
||||
self.process_frame(frame, from_network_interface)
|
||||
|
||||
def configure_port(self, port: int, ip_address: Union[IPv4Address, str], subnet_mask: Union[IPv4Address, str]):
|
||||
"""
|
||||
@@ -782,10 +931,12 @@ class Router(Node):
|
||||
ip_address = IPv4Address(ip_address)
|
||||
if not isinstance(subnet_mask, IPv4Address):
|
||||
subnet_mask = IPv4Address(subnet_mask)
|
||||
nic = self.ethernet_ports[port]
|
||||
nic.ip_address = ip_address
|
||||
nic.subnet_mask = subnet_mask
|
||||
self.sys_log.info(f"Configured port {port} with ip_address={ip_address}/{nic.ip_network.prefixlen}")
|
||||
network_interface = self.network_interface[port]
|
||||
network_interface.ip_address = ip_address
|
||||
network_interface.subnet_mask = subnet_mask
|
||||
self.sys_log.info(
|
||||
f"Configured Network Interface {network_interface}"
|
||||
)
|
||||
self.set_original_state()
|
||||
|
||||
def enable_port(self, port: int):
|
||||
@@ -794,9 +945,9 @@ class Router(Node):
|
||||
|
||||
:param port: The port to enable.
|
||||
"""
|
||||
nic = self.ethernet_ports.get(port)
|
||||
if nic:
|
||||
nic.enable()
|
||||
network_interface = self.network_interface.get(port)
|
||||
if network_interface:
|
||||
network_interface.enable()
|
||||
|
||||
def disable_port(self, port: int):
|
||||
"""
|
||||
@@ -804,9 +955,9 @@ class Router(Node):
|
||||
|
||||
:param port: The port to disable.
|
||||
"""
|
||||
nic = self.ethernet_ports.get(port)
|
||||
if nic:
|
||||
nic.disable()
|
||||
network_interface = self.network_interface.get(port)
|
||||
if network_interface:
|
||||
network_interface.disable()
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
@@ -820,14 +971,14 @@ class Router(Node):
|
||||
table.set_style(MARKDOWN)
|
||||
table.align = "l"
|
||||
table.title = f"{self.hostname} Ethernet Interfaces"
|
||||
for port, nic in self.ethernet_ports.items():
|
||||
for port, network_interface in self.network_interface.items():
|
||||
table.add_row(
|
||||
[
|
||||
port,
|
||||
nic.mac_address,
|
||||
f"{nic.ip_address}/{nic.ip_network.prefixlen}",
|
||||
nic.speed,
|
||||
"Enabled" if nic.enabled else "Disabled",
|
||||
network_interface.mac_address,
|
||||
f"{network_interface.ip_address}/{network_interface.ip_network.prefixlen}",
|
||||
network_interface.speed,
|
||||
"Enabled" if network_interface.enabled else "Disabled",
|
||||
]
|
||||
)
|
||||
print(table)
|
||||
@@ -1,16 +1,93 @@
|
||||
from typing import Dict
|
||||
from __future__ import annotations
|
||||
from typing import Dict, Optional
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.exceptions import NetworkError
|
||||
from primaite.simulator.network.hardware.base import Link, Node, SwitchPort
|
||||
from primaite.simulator.network.hardware.base import WiredNetworkInterface, NetworkInterface, Link
|
||||
from primaite.simulator.network.hardware.nodes.network.network_node import NetworkNode
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
class Switch(Node):
|
||||
class SwitchPort(WiredNetworkInterface):
|
||||
"""
|
||||
Represents a Switch Port.
|
||||
|
||||
Switch ports connect devices within the same network. They operate at the data link layer (Layer 2) of the OSI model
|
||||
and are responsible for receiving and forwarding frames based on MAC addresses. Despite operating at Layer 2,
|
||||
they are an essential part of network infrastructure, enabling LAN segmentation, bandwidth management, and
|
||||
the creation of VLANs.
|
||||
|
||||
Inherits from:
|
||||
- WiredNetworkInterface: Provides properties and methods specific to wired connections.
|
||||
|
||||
Switch ports typically do not have IP addresses assigned to them as they function at Layer 2, but managed switches
|
||||
can have management IP addresses for remote management and configuration purposes.
|
||||
"""
|
||||
_connected_node: Optional[Switch] = None
|
||||
"The Switch to which the SwitchPort is connected."
|
||||
|
||||
def set_original_state(self):
|
||||
"""Sets the original state."""
|
||||
vals_to_include = {"port_num", "mac_address", "speed", "mtu", "enabled"}
|
||||
self._original_state = self.model_dump(include=vals_to_include)
|
||||
super().set_original_state()
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
|
||||
|
||||
:return: Current state of this object and child objects.
|
||||
:rtype: Dict
|
||||
"""
|
||||
state = super().describe_state()
|
||||
state.update(
|
||||
{
|
||||
"mac_address": self.mac_address,
|
||||
"speed": self.speed,
|
||||
"mtu": self.mtu,
|
||||
"enabled": self.enabled,
|
||||
}
|
||||
)
|
||||
return state
|
||||
|
||||
def send_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Attempts to send a network frame through the interface.
|
||||
|
||||
:param frame: The network frame to be sent.
|
||||
:return: A boolean indicating whether the frame was successfully sent.
|
||||
"""
|
||||
if self.enabled:
|
||||
self.pcap.capture_outbound(frame)
|
||||
self._connected_link.transmit_frame(sender_nic=self, frame=frame)
|
||||
return True
|
||||
# Cannot send Frame as the SwitchPort is not enabled
|
||||
return False
|
||||
|
||||
def receive_frame(self, frame: Frame) -> bool:
|
||||
"""
|
||||
Receives a network frame on the interface.
|
||||
|
||||
:param frame: The network frame being received.
|
||||
:return: A boolean indicating whether the frame was successfully received.
|
||||
"""
|
||||
if self.enabled:
|
||||
frame.decrement_ttl()
|
||||
if frame.ip and frame.ip.ttl < 1:
|
||||
self._connected_node.sys_log.info("Frame discarded as TTL limit reached")
|
||||
return False
|
||||
self.pcap.capture_inbound(frame)
|
||||
self._connected_node.receive_frame(frame=frame, from_network_interface=self)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class Switch(NetworkNode):
|
||||
"""
|
||||
A class representing a Layer 2 network switch.
|
||||
|
||||
@@ -30,7 +107,7 @@ class Switch(Node):
|
||||
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
|
||||
for port_num, port in self.switch_ports.items():
|
||||
port._connected_node = self
|
||||
port._port_num_on_node = port_num
|
||||
port.port_num = port_num
|
||||
port.parent = self
|
||||
port.port_num = port_num
|
||||
|
||||
@@ -78,16 +155,16 @@ class Switch(Node):
|
||||
self.sys_log.info(f"Removed MAC table entry: Port {mac_table_port.port_num} -> {mac_address}")
|
||||
self._add_mac_table_entry(mac_address, switch_port)
|
||||
|
||||
def forward_frame(self, frame: Frame, incoming_port: SwitchPort):
|
||||
def receive_frame(self, frame: Frame, from_network_interface: SwitchPort):
|
||||
"""
|
||||
Forward a frame to the appropriate port based on the destination MAC address.
|
||||
|
||||
:param frame: The Frame to be forwarded.
|
||||
:param incoming_port: The port number from which the frame was received.
|
||||
:param frame: The Frame being received.
|
||||
:param from_network_interface: The SwitchPort that received the frame.
|
||||
"""
|
||||
src_mac = frame.ethernet.src_mac_addr
|
||||
dst_mac = frame.ethernet.dst_mac_addr
|
||||
self._add_mac_table_entry(src_mac, incoming_port)
|
||||
self._add_mac_table_entry(src_mac, from_network_interface)
|
||||
|
||||
outgoing_port = self.mac_address_table.get(dst_mac)
|
||||
if outgoing_port and dst_mac.lower() != "ff:ff:ff:ff:ff:ff":
|
||||
@@ -95,7 +172,7 @@ class Switch(Node):
|
||||
else:
|
||||
# If the destination MAC is not in the table, flood to all ports except incoming
|
||||
for port in self.switch_ports.values():
|
||||
if port.enabled and port != incoming_port:
|
||||
if port.enabled and port != from_network_interface:
|
||||
port.send_frame(frame)
|
||||
|
||||
def disconnect_link_from_port(self, link: Link, port_number: int):
|
||||
@@ -1,11 +1,12 @@
|
||||
from ipaddress import IPv4Address
|
||||
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.hardware.base import NIC, NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.router import ACLAction, Router
|
||||
from primaite.simulator.network.hardware.nodes.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.switch import Switch
|
||||
from primaite.simulator.network.hardware.base import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.host_node import NIC
|
||||
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.network.switch import Switch
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient
|
||||
@@ -40,13 +41,13 @@ def client_server_routed() -> Network:
|
||||
# Switch 1
|
||||
switch_1 = Switch(hostname="switch_1", num_ports=6)
|
||||
switch_1.power_on()
|
||||
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[6])
|
||||
network.connect(endpoint_a=router_1.network_interface[1], endpoint_b=switch_1.switch_ports[6])
|
||||
router_1.enable_port(1)
|
||||
|
||||
# Switch 2
|
||||
switch_2 = Switch(hostname="switch_2", num_ports=6)
|
||||
switch_2.power_on()
|
||||
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[6])
|
||||
network.connect(endpoint_a=router_1.network_interface[2], endpoint_b=switch_2.switch_ports[6])
|
||||
router_1.enable_port(2)
|
||||
|
||||
# Client 1
|
||||
@@ -58,7 +59,7 @@ def client_server_routed() -> Network:
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
client_1.power_on()
|
||||
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
|
||||
network.connect(endpoint_b=client_1.network_interface[1], endpoint_a=switch_2.switch_ports[1])
|
||||
|
||||
# Server 1
|
||||
server_1 = Server(
|
||||
@@ -69,7 +70,7 @@ def client_server_routed() -> Network:
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
server_1.power_on()
|
||||
network.connect(endpoint_b=server_1.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
|
||||
network.connect(endpoint_b=server_1.network_interface[1], endpoint_a=switch_1.switch_ports[1])
|
||||
|
||||
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
|
||||
|
||||
@@ -126,13 +127,13 @@ def arcd_uc2_network() -> Network:
|
||||
# Switch 1
|
||||
switch_1 = Switch(hostname="switch_1", num_ports=8, operating_state=NodeOperatingState.ON)
|
||||
switch_1.power_on()
|
||||
network.connect(endpoint_a=router_1.ethernet_ports[1], endpoint_b=switch_1.switch_ports[8])
|
||||
network.connect(endpoint_a=router_1.network_interface[1], endpoint_b=switch_1.switch_ports[8])
|
||||
router_1.enable_port(1)
|
||||
|
||||
# Switch 2
|
||||
switch_2 = Switch(hostname="switch_2", num_ports=8, operating_state=NodeOperatingState.ON)
|
||||
switch_2.power_on()
|
||||
network.connect(endpoint_a=router_1.ethernet_ports[2], endpoint_b=switch_2.switch_ports[8])
|
||||
network.connect(endpoint_a=router_1.network_interface[2], endpoint_b=switch_2.switch_ports[8])
|
||||
router_1.enable_port(2)
|
||||
|
||||
# Client 1
|
||||
@@ -145,7 +146,7 @@ def arcd_uc2_network() -> Network:
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
client_1.power_on()
|
||||
network.connect(endpoint_b=client_1.ethernet_port[1], endpoint_a=switch_2.switch_ports[1])
|
||||
network.connect(endpoint_b=client_1.network_interface[1], endpoint_a=switch_2.switch_ports[1])
|
||||
client_1.software_manager.install(DataManipulationBot)
|
||||
db_manipulation_bot: DataManipulationBot = client_1.software_manager.software.get("DataManipulationBot")
|
||||
db_manipulation_bot.configure(
|
||||
@@ -167,7 +168,7 @@ def arcd_uc2_network() -> Network:
|
||||
client_2.power_on()
|
||||
web_browser = client_2.software_manager.software.get("WebBrowser")
|
||||
web_browser.target_url = "http://arcd.com/users/"
|
||||
network.connect(endpoint_b=client_2.ethernet_port[1], endpoint_a=switch_2.switch_ports[2])
|
||||
network.connect(endpoint_b=client_2.network_interface[1], endpoint_a=switch_2.switch_ports[2])
|
||||
|
||||
# Domain Controller
|
||||
domain_controller = Server(
|
||||
@@ -180,7 +181,7 @@ def arcd_uc2_network() -> Network:
|
||||
domain_controller.power_on()
|
||||
domain_controller.software_manager.install(DNSServer)
|
||||
|
||||
network.connect(endpoint_b=domain_controller.ethernet_port[1], endpoint_a=switch_1.switch_ports[1])
|
||||
network.connect(endpoint_b=domain_controller.network_interface[1], endpoint_a=switch_1.switch_ports[1])
|
||||
|
||||
# Database Server
|
||||
database_server = Server(
|
||||
@@ -192,7 +193,7 @@ def arcd_uc2_network() -> Network:
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
database_server.power_on()
|
||||
network.connect(endpoint_b=database_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[3])
|
||||
network.connect(endpoint_b=database_server.network_interface[1], endpoint_a=switch_1.switch_ports[3])
|
||||
|
||||
ddl = """
|
||||
CREATE TABLE IF NOT EXISTS user (
|
||||
@@ -270,7 +271,7 @@ def arcd_uc2_network() -> Network:
|
||||
|
||||
database_client: DatabaseClient = web_server.software_manager.software.get("DatabaseClient")
|
||||
database_client.configure(server_ip_address=IPv4Address("192.168.1.14"))
|
||||
network.connect(endpoint_b=web_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[2])
|
||||
network.connect(endpoint_b=web_server.network_interface[1], endpoint_a=switch_1.switch_ports[2])
|
||||
database_client.run()
|
||||
database_client.connect()
|
||||
|
||||
@@ -291,7 +292,7 @@ def arcd_uc2_network() -> Network:
|
||||
)
|
||||
backup_server.power_on()
|
||||
backup_server.software_manager.install(FTPServer)
|
||||
network.connect(endpoint_b=backup_server.ethernet_port[1], endpoint_a=switch_1.switch_ports[4])
|
||||
network.connect(endpoint_b=backup_server.network_interface[1], endpoint_a=switch_1.switch_ports[4])
|
||||
|
||||
# Security Suite
|
||||
security_suite = Server(
|
||||
@@ -303,9 +304,9 @@ def arcd_uc2_network() -> Network:
|
||||
operating_state=NodeOperatingState.ON,
|
||||
)
|
||||
security_suite.power_on()
|
||||
network.connect(endpoint_b=security_suite.ethernet_port[1], endpoint_a=switch_1.switch_ports[7])
|
||||
network.connect(endpoint_b=security_suite.network_interface[1], endpoint_a=switch_1.switch_ports[7])
|
||||
security_suite.connect_nic(NIC(ip_address="192.168.10.110", subnet_mask="255.255.255.0"))
|
||||
network.connect(endpoint_b=security_suite.ethernet_port[2], endpoint_a=switch_2.switch_ports[7])
|
||||
network.connect(endpoint_b=security_suite.network_interface[2], endpoint_a=switch_2.switch_ports[7])
|
||||
|
||||
router_1.acl.add_rule(action=ACLAction.PERMIT, src_port=Port.ARP, dst_port=Port.ARP, position=22)
|
||||
|
||||
|
||||
@@ -13,11 +13,12 @@ class ARPEntry(BaseModel):
|
||||
Represents an entry in the ARP cache.
|
||||
|
||||
:param mac_address: The MAC address associated with the IP address.
|
||||
:param nic: The NIC through which the NIC with the IP address is reachable.
|
||||
:param network_interface_uuid: The UIId of the Network Interface through which the NIC with the IP address is
|
||||
reachable.
|
||||
"""
|
||||
|
||||
mac_address: str
|
||||
nic_uuid: str
|
||||
network_interface_uuid: str
|
||||
|
||||
|
||||
class ARPPacket(DataPacket):
|
||||
|
||||
@@ -21,7 +21,7 @@ class PacketCapture:
|
||||
The PCAPs are logged to: <simulation output directory>/<hostname>/<hostname>_<ip address>_pcap.log
|
||||
"""
|
||||
|
||||
def __init__(self, hostname: str, ip_address: Optional[str] = None, switch_port_number: Optional[int] = None):
|
||||
def __init__(self, hostname: str, ip_address: Optional[str] = None, interface_num: Optional[int] = None):
|
||||
"""
|
||||
Initialize the PacketCapture process.
|
||||
|
||||
@@ -32,8 +32,8 @@ class PacketCapture:
|
||||
"The hostname for which PCAP logs are being recorded."
|
||||
self.ip_address: str = ip_address
|
||||
"The IP address associated with the PCAP logs."
|
||||
self.switch_port_number = switch_port_number
|
||||
"The SwitchPort number."
|
||||
self.interface_num = interface_num
|
||||
"The interface num on the Node."
|
||||
|
||||
self.inbound_logger = None
|
||||
self.outbound_logger = None
|
||||
@@ -81,8 +81,8 @@ class PacketCapture:
|
||||
"""Get PCAP the logger name."""
|
||||
if self.ip_address:
|
||||
return f"{self.hostname}_{self.ip_address}_{'outbound' if outbound else 'inbound'}_pcap"
|
||||
if self.switch_port_number:
|
||||
return f"{self.hostname}_port-{self.switch_port_number}_{'outbound' if outbound else 'inbound'}_pcap"
|
||||
if self.interface_num:
|
||||
return f"{self.hostname}_port-{self.interface_num}_{'outbound' if outbound else 'inbound'}_pcap"
|
||||
return f"{self.hostname}_{'outbound' if outbound else 'inbound'}_pcap"
|
||||
|
||||
def _get_log_path(self, outbound: bool = False) -> Path:
|
||||
|
||||
@@ -13,7 +13,7 @@ from primaite.simulator.network.transmission.network_layer import IPPacket, IPPr
|
||||
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader, UDPHeader
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from primaite.simulator.network.hardware.base import ARPCache, NIC
|
||||
from primaite.simulator.network.hardware.base import NetworkInterface
|
||||
from primaite.simulator.system.core.software_manager import SoftwareManager
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
|
||||
@@ -84,8 +84,6 @@ class SessionManager:
|
||||
self.software_manager: SoftwareManager = None # Noqa
|
||||
self.node: Node = None # noqa
|
||||
|
||||
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -104,7 +102,7 @@ class SessionManager:
|
||||
|
||||
@staticmethod
|
||||
def _get_session_key(
|
||||
frame: Frame, inbound_frame: bool = True
|
||||
frame: Frame, inbound_frame: bool = True
|
||||
) -> Tuple[IPProtocol, IPv4Address, Optional[Port], Optional[Port]]:
|
||||
"""
|
||||
Extracts the session key from the given frame.
|
||||
@@ -142,19 +140,19 @@ class SessionManager:
|
||||
dst_port = None
|
||||
return protocol, with_ip_address, src_port, dst_port
|
||||
|
||||
def resolve_outbound_nic(self, dst_ip_address: IPv4Address) -> Optional[NIC]:
|
||||
for nic in self.node.nics.values():
|
||||
if dst_ip_address in nic.ip_network and nic.enabled:
|
||||
return nic
|
||||
return self.software_manager.arp.get_default_gateway_nic()
|
||||
def resolve_outbound_network_interface(self, dst_ip_address: IPv4Address) -> Optional['NetworkInterface']:
|
||||
for network_interface in self.node.network_interfaces.values():
|
||||
if dst_ip_address in network_interface.ip_network and network_interface.enabled:
|
||||
return network_interface
|
||||
return self.software_manager.arp.get_default_gateway_network_interface()
|
||||
|
||||
def resolve_outbound_transmission_details(
|
||||
self, dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, session_id: Optional[str] = None
|
||||
) -> Tuple[Optional["NIC"], Optional[str], IPv4Address, Optional[IPProtocol], bool]:
|
||||
self, dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None, session_id: Optional[str] = None
|
||||
) -> Tuple[Optional['NetworkInterface'], Optional[str], IPv4Address, Optional[IPProtocol], bool]:
|
||||
if not isinstance(dst_ip_address, (IPv4Address, IPv4Network)):
|
||||
dst_ip_address = IPv4Address(dst_ip_address)
|
||||
is_broadcast = False
|
||||
outbound_nic = None
|
||||
outbound_network_interface = None
|
||||
dst_mac_address = None
|
||||
protocol = None
|
||||
|
||||
@@ -172,36 +170,36 @@ class SessionManager:
|
||||
dst_ip_address = dst_ip_address.broadcast_address
|
||||
if dst_ip_address:
|
||||
# Find a suitable NIC for the broadcast
|
||||
for nic in self.node.nics.values():
|
||||
if dst_ip_address in nic.ip_network and nic.enabled:
|
||||
for network_interface in self.node.network_interfaces.values():
|
||||
if dst_ip_address in network_interface.ip_network and network_interface.enabled:
|
||||
dst_mac_address = "ff:ff:ff:ff:ff:ff"
|
||||
outbound_nic = nic
|
||||
outbound_network_interface = network_interface
|
||||
break
|
||||
else:
|
||||
# Resolve MAC address for unicast transmission
|
||||
use_default_gateway = True
|
||||
for nic in self.node.nics.values():
|
||||
if dst_ip_address in nic.ip_network and nic.enabled:
|
||||
for network_interface in self.node.network_interfaces.values():
|
||||
if dst_ip_address in network_interface.ip_network and network_interface.enabled:
|
||||
dst_mac_address = self.software_manager.arp.get_arp_cache_mac_address(dst_ip_address)
|
||||
break
|
||||
|
||||
if dst_ip_address:
|
||||
use_default_gateway = False
|
||||
outbound_nic = self.software_manager.arp.get_arp_cache_nic(dst_ip_address)
|
||||
outbound_network_interface = self.software_manager.arp.get_arp_cache_network_interface(dst_ip_address)
|
||||
|
||||
if use_default_gateway:
|
||||
dst_mac_address = self.software_manager.arp.get_default_gateway_mac_address()
|
||||
outbound_nic = self.software_manager.arp.get_default_gateway_nic()
|
||||
return outbound_nic, dst_mac_address, dst_ip_address, protocol, is_broadcast
|
||||
outbound_network_interface = self.software_manager.arp.get_default_gateway_network_interface()
|
||||
return outbound_network_interface, dst_mac_address, dst_ip_address, protocol, is_broadcast
|
||||
|
||||
def receive_payload_from_software_manager(
|
||||
self,
|
||||
payload: Any,
|
||||
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
|
||||
dst_port: Optional[Port] = None,
|
||||
session_id: Optional[str] = None,
|
||||
ip_protocol: IPProtocol = IPProtocol.TCP,
|
||||
icmp_packet: Optional[ICMPPacket] = None
|
||||
self,
|
||||
payload: Any,
|
||||
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
|
||||
dst_port: Optional[Port] = None,
|
||||
session_id: Optional[str] = None,
|
||||
ip_protocol: IPProtocol = IPProtocol.TCP,
|
||||
icmp_packet: Optional[ICMPPacket] = None
|
||||
) -> Union[Any, None]:
|
||||
"""
|
||||
Receive a payload from the SoftwareManager and send it to the appropriate NIC for transmission.
|
||||
@@ -222,19 +220,19 @@ class SessionManager:
|
||||
dst_mac_address = "ff:ff:ff:ff:ff:ff"
|
||||
else:
|
||||
dst_mac_address = payload.target_mac_addr
|
||||
outbound_nic = self.resolve_outbound_nic(payload.target_ip_address)
|
||||
outbound_network_interface = self.resolve_outbound_network_interface(payload.target_ip_address)
|
||||
is_broadcast = payload.request
|
||||
ip_protocol = IPProtocol.UDP
|
||||
else:
|
||||
vals = self.resolve_outbound_transmission_details(
|
||||
dst_ip_address=dst_ip_address, session_id=session_id
|
||||
)
|
||||
outbound_nic, dst_mac_address, dst_ip_address, protocol, is_broadcast = vals
|
||||
outbound_network_interface, dst_mac_address, dst_ip_address, protocol, is_broadcast = vals
|
||||
if protocol:
|
||||
ip_protocol = protocol
|
||||
|
||||
# Check if outbound NIC and destination MAC address are resolved
|
||||
if not outbound_nic or not dst_mac_address:
|
||||
if not outbound_network_interface or not dst_mac_address:
|
||||
return False
|
||||
|
||||
tcp_header = None
|
||||
@@ -249,10 +247,18 @@ class SessionManager:
|
||||
src_port=dst_port,
|
||||
dst_port=dst_port,
|
||||
)
|
||||
# TODO: Only create IP packet if not ARP
|
||||
# ip_packet = None
|
||||
# if dst_port != Port.ARP:
|
||||
# IPPacket(
|
||||
# src_ip_address=outbound_network_interface.ip_address,
|
||||
# dst_ip_address=dst_ip_address,
|
||||
# protocol=ip_protocol
|
||||
# )
|
||||
# Construct the frame for transmission
|
||||
frame = Frame(
|
||||
ethernet=EthernetHeader(src_mac_addr=outbound_nic.mac_address, dst_mac_addr=dst_mac_address),
|
||||
ip=IPPacket(src_ip_address=outbound_nic.ip_address, dst_ip_address=dst_ip_address, protocol=ip_protocol),
|
||||
ethernet=EthernetHeader(src_mac_addr=outbound_network_interface.mac_address, dst_mac_addr=dst_mac_address),
|
||||
ip=IPPacket(src_ip_address=outbound_network_interface.ip_address, dst_ip_address=dst_ip_address, protocol=ip_protocol),
|
||||
tcp=tcp_header,
|
||||
udp=udp_header,
|
||||
icmp=icmp_packet,
|
||||
@@ -271,9 +277,9 @@ class SessionManager:
|
||||
self.sessions_by_uuid[session.uuid] = session
|
||||
|
||||
# Send the frame through the NIC
|
||||
return outbound_nic.send_frame(frame)
|
||||
return outbound_network_interface.send_frame(frame)
|
||||
|
||||
def receive_frame(self, frame: Frame, from_nic: NIC):
|
||||
def receive_frame(self, frame: Frame, from_network_interface: 'NetworkInterface'):
|
||||
"""
|
||||
Receive a Frame.
|
||||
|
||||
@@ -302,7 +308,7 @@ class SessionManager:
|
||||
port=dst_port,
|
||||
protocol=frame.ip.protocol,
|
||||
session_id=session.uuid,
|
||||
from_nic=from_nic,
|
||||
from_network_interface=from_network_interface,
|
||||
frame=frame
|
||||
)
|
||||
|
||||
|
||||
@@ -167,7 +167,7 @@ class SoftwareManager:
|
||||
)
|
||||
|
||||
def receive_payload_from_session_manager(
|
||||
self, payload: Any, port: Port, protocol: IPProtocol, session_id: str, from_nic: "NIC", frame: Frame
|
||||
self, payload: Any, port: Port, protocol: IPProtocol, session_id: str, from_network_interface: "NIC", frame: Frame
|
||||
):
|
||||
"""
|
||||
Receive a payload from the SessionManager and forward it to the corresponding service or application.
|
||||
@@ -177,7 +177,7 @@ class SoftwareManager:
|
||||
"""
|
||||
receiver: Optional[Union[Service, Application]] = self.port_protocol_mapping.get((port, protocol), None)
|
||||
if receiver:
|
||||
receiver.receive(payload=payload, session_id=session_id, from_nic=from_nic, frame=frame)
|
||||
receiver.receive(payload=payload, session_id=session_id, from_network_interface=from_network_interface, frame=frame)
|
||||
else:
|
||||
self.sys_log.error(f"No service or application found for port {port} and protocol {protocol}")
|
||||
pass
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import abstractmethod
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Any, Dict, Optional, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
|
||||
from primaite.simulator.network.hardware.base import NIC
|
||||
from primaite.simulator.network.hardware.base import NetworkInterface
|
||||
from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
|
||||
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPPacket, IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port, UDPHeader
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
class ARP(Service):
|
||||
@@ -21,7 +20,7 @@ class ARP(Service):
|
||||
Manages ARP for resolving network layer addresses into link layer addresses. It maintains an ARP cache,
|
||||
sends ARP requests and replies, and processes incoming ARP packets.
|
||||
"""
|
||||
arp: Dict[IPv4Address, ARPEntry] = {}
|
||||
arp: Dict[IPV4Address, ARPEntry] = {}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "ARP"
|
||||
@@ -30,7 +29,7 @@ class ARP(Service):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
pass
|
||||
return super().describe_state()
|
||||
|
||||
def show(self, markdown: bool = False):
|
||||
"""
|
||||
@@ -48,7 +47,7 @@ class ARP(Service):
|
||||
[
|
||||
str(ip),
|
||||
arp.mac_address,
|
||||
self.software_manager.node.nics[arp.nic_uuid].mac_address,
|
||||
self.software_manager.node.network_interfaces[arp.network_interface_uuid].mac_address,
|
||||
]
|
||||
)
|
||||
print(table)
|
||||
@@ -57,7 +56,13 @@ class ARP(Service):
|
||||
"""Clears the arp cache."""
|
||||
self.arp.clear()
|
||||
|
||||
def add_arp_cache_entry(self, ip_address: IPv4Address, mac_address: str, nic: NIC, override: bool = False):
|
||||
def add_arp_cache_entry(
|
||||
self,
|
||||
ip_address: IPV4Address,
|
||||
mac_address: str,
|
||||
network_interface: NetworkInterface,
|
||||
override: bool = False
|
||||
):
|
||||
"""
|
||||
Add an ARP entry to the cache.
|
||||
|
||||
@@ -66,20 +71,20 @@ class ARP(Service):
|
||||
|
||||
:param ip_address: The IP address to be added to the cache.
|
||||
:param mac_address: The MAC address associated with the IP address.
|
||||
:param nic: The NIC through which the NIC with the IP address is reachable.
|
||||
:param network_interface: The NIC through which the NIC with the IP address is reachable.
|
||||
:param override: If True, an existing entry for the IP address will be overridden. Default is False.
|
||||
"""
|
||||
for _nic in self.software_manager.node.nics.values():
|
||||
if _nic.ip_address == ip_address:
|
||||
for _network_interface in self.software_manager.node.network_interfaces.values():
|
||||
if _network_interface.ip_address == ip_address:
|
||||
return
|
||||
if override or not self.arp.get(ip_address):
|
||||
self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {nic}")
|
||||
arp_entry = ARPEntry(mac_address=mac_address, nic_uuid=nic.uuid)
|
||||
self.sys_log.info(f"Adding ARP cache entry for {mac_address}/{ip_address} via NIC {network_interface}")
|
||||
arp_entry = ARPEntry(mac_address=mac_address, network_interface_uuid=network_interface.uuid)
|
||||
|
||||
self.arp[ip_address] = arp_entry
|
||||
|
||||
@abstractmethod
|
||||
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
|
||||
def get_arp_cache_mac_address(self, ip_address: IPV4Address) -> Optional[str]:
|
||||
"""
|
||||
Retrieves the MAC address associated with a given IP address from the ARP cache.
|
||||
|
||||
@@ -89,7 +94,7 @@ class ARP(Service):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
|
||||
def get_arp_cache_network_interface(self, ip_address: IPV4Address) -> Optional[NetworkInterface]:
|
||||
"""
|
||||
Retrieves the NIC associated with a given IP address from the ARP cache.
|
||||
|
||||
@@ -98,18 +103,20 @@ class ARP(Service):
|
||||
"""
|
||||
pass
|
||||
|
||||
def send_arp_request(self, target_ip_address: Union[IPv4Address, str]):
|
||||
def send_arp_request(self, target_ip_address: Union[IPV4Address, str]):
|
||||
"""
|
||||
Sends an ARP request to resolve the MAC address of a target IP address.
|
||||
|
||||
:param target_ip_address: The target IP address for which the MAC address is being requested.
|
||||
"""
|
||||
outbound_nic = self.software_manager.session_manager.resolve_outbound_nic(target_ip_address)
|
||||
if outbound_nic:
|
||||
self.sys_log.info(f"Sending ARP request from NIC {outbound_nic} for ip {target_ip_address}")
|
||||
outbound_network_interface = self.software_manager.session_manager.resolve_outbound_network_interface(
|
||||
target_ip_address
|
||||
)
|
||||
if outbound_network_interface:
|
||||
self.sys_log.info(f"Sending ARP request from NIC {outbound_network_interface} for ip {target_ip_address}")
|
||||
arp_packet = ARPPacket(
|
||||
sender_ip_address=outbound_nic.ip_address,
|
||||
sender_mac_addr=outbound_nic.mac_address,
|
||||
sender_ip_address=outbound_network_interface.ip_address,
|
||||
sender_mac_addr=outbound_network_interface.mac_address,
|
||||
target_ip_address=target_ip_address,
|
||||
)
|
||||
self.software_manager.session_manager.receive_payload_from_software_manager(
|
||||
@@ -125,11 +132,13 @@ class ARP(Service):
|
||||
Sends an ARP reply in response to an ARP request.
|
||||
|
||||
:param arp_reply: The ARP packet containing the reply.
|
||||
:param from_nic: The NIC from which the ARP reply is sent.
|
||||
:param from_network_interface: The NIC from which the ARP reply is sent.
|
||||
"""
|
||||
|
||||
outbound_nic = self.software_manager.session_manager.resolve_outbound_nic(arp_reply.target_ip_address)
|
||||
if outbound_nic:
|
||||
outbound_network_interface = self.software_manager.session_manager.resolve_outbound_network_interface(
|
||||
arp_reply.target_ip_address
|
||||
)
|
||||
if outbound_network_interface:
|
||||
self.sys_log.info(
|
||||
f"Sending ARP reply from {arp_reply.sender_mac_addr}/{arp_reply.sender_ip_address} "
|
||||
f"to {arp_reply.target_ip_address}/{arp_reply.target_mac_addr} "
|
||||
@@ -147,31 +156,33 @@ class ARP(Service):
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def _process_arp_request(self, arp_packet: ARPPacket, from_nic: NIC):
|
||||
def _process_arp_request(self, arp_packet: ARPPacket, from_network_interface: NIC):
|
||||
"""
|
||||
Processes an incoming ARP request.
|
||||
|
||||
:param arp_packet: The ARP packet containing the request.
|
||||
:param from_nic: The NIC that received the ARP request.
|
||||
:param from_network_interface: The NIC that received the ARP request.
|
||||
"""
|
||||
self.sys_log.info(
|
||||
f"Received ARP request for {arp_packet.target_ip_address} from "
|
||||
f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
|
||||
)
|
||||
|
||||
def _process_arp_reply(self, arp_packet: ARPPacket, from_nic: NIC):
|
||||
def _process_arp_reply(self, arp_packet: ARPPacket, from_network_interface: NIC):
|
||||
"""
|
||||
Processes an incoming ARP reply.
|
||||
|
||||
:param arp_packet: The ARP packet containing the reply.
|
||||
:param from_nic: The NIC that received the ARP reply.
|
||||
:param from_network_interface: The NIC that received the ARP reply.
|
||||
"""
|
||||
self.sys_log.info(
|
||||
f"Received ARP response for {arp_packet.sender_ip_address} "
|
||||
f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
|
||||
f"from {arp_packet.sender_mac_addr} via Network Interface {from_network_interface}"
|
||||
)
|
||||
self.add_arp_cache_entry(
|
||||
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
|
||||
ip_address=arp_packet.sender_ip_address,
|
||||
mac_address=arp_packet.sender_mac_addr,
|
||||
network_interface=from_network_interface
|
||||
)
|
||||
|
||||
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
|
||||
@@ -183,15 +194,15 @@ class ARP(Service):
|
||||
:param kwargs: Additional keyword arguments.
|
||||
:return: True if the payload was processed successfully, otherwise False.
|
||||
"""
|
||||
if not isinstance(payload, ARPPacket):
|
||||
print("failied on payload check", type(payload))
|
||||
if not super().receive(payload, session_id, **kwargs):
|
||||
return False
|
||||
|
||||
from_nic = kwargs.get("from_nic")
|
||||
from_network_interface = kwargs.get("from_network_interface")
|
||||
if payload.request:
|
||||
self._process_arp_request(arp_packet=payload, from_nic=from_nic)
|
||||
self._process_arp_request(arp_packet=payload, from_network_interface=from_network_interface)
|
||||
else:
|
||||
self._process_arp_reply(arp_packet=payload, from_nic=from_nic)
|
||||
self._process_arp_reply(arp_packet=payload, from_network_interface=from_network_interface)
|
||||
return True
|
||||
|
||||
def __contains__(self, item: Any) -> bool:
|
||||
"""
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Optional
|
||||
|
||||
from primaite.simulator.network.hardware.base import NIC
|
||||
from primaite.simulator.system.services.arp.arp import ARP, ARPPacket
|
||||
|
||||
|
||||
class HostARP(ARP):
|
||||
def get_default_gateway_mac_address(self) -> Optional[str]:
|
||||
if self.software_manager.node.default_gateway:
|
||||
return self.get_arp_cache_mac_address(self.software_manager.node.default_gateway)
|
||||
|
||||
def get_default_gateway_nic(self) -> Optional[NIC]:
|
||||
if self.software_manager.node.default_gateway:
|
||||
return self.get_arp_cache_nic(self.software_manager.node.default_gateway)
|
||||
|
||||
def _get_arp_cache_mac_address(
|
||||
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
|
||||
) -> Optional[str]:
|
||||
arp_entry = self.arp.get(ip_address)
|
||||
|
||||
if arp_entry:
|
||||
return arp_entry.mac_address
|
||||
else:
|
||||
if not is_reattempt:
|
||||
self.send_arp_request(ip_address)
|
||||
return self._get_arp_cache_mac_address(
|
||||
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
|
||||
)
|
||||
else:
|
||||
if self.software_manager.node.default_gateway:
|
||||
if not is_default_gateway_attempt:
|
||||
self.send_arp_request(self.software_manager.node.default_gateway)
|
||||
return self._get_arp_cache_mac_address(
|
||||
ip_address=self.software_manager.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
|
||||
)
|
||||
return None
|
||||
|
||||
def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
|
||||
"""
|
||||
Get the MAC address associated with an IP address.
|
||||
|
||||
:param ip_address: The IP address to look up in the cache.
|
||||
:return: The MAC address associated with the IP address, or None if not found.
|
||||
"""
|
||||
return self._get_arp_cache_mac_address(ip_address)
|
||||
|
||||
def _get_arp_cache_nic(
|
||||
self, ip_address: IPv4Address, is_reattempt: bool = False, is_default_gateway_attempt: bool = False
|
||||
) -> Optional[NIC]:
|
||||
arp_entry = self.arp.get(ip_address)
|
||||
|
||||
if arp_entry:
|
||||
return self.software_manager.node.nics[arp_entry.nic_uuid]
|
||||
else:
|
||||
if not is_reattempt:
|
||||
self.send_arp_request(ip_address)
|
||||
return self._get_arp_cache_nic(
|
||||
ip_address=ip_address, is_reattempt=True, is_default_gateway_attempt=is_default_gateway_attempt
|
||||
)
|
||||
else:
|
||||
if self.software_manager.node.default_gateway:
|
||||
if not is_default_gateway_attempt:
|
||||
self.send_arp_request(self.software_manager.node.default_gateway)
|
||||
return self._get_arp_cache_nic(
|
||||
ip_address=self.software_manager.node.default_gateway, is_reattempt=True, is_default_gateway_attempt=True
|
||||
)
|
||||
return None
|
||||
|
||||
def get_arp_cache_nic(self, ip_address: IPv4Address) -> Optional[NIC]:
|
||||
"""
|
||||
Get the NIC associated with an IP address.
|
||||
|
||||
:param ip_address: The IP address to look up in the cache.
|
||||
:return: The NIC associated with the IP address, or None if not found.
|
||||
"""
|
||||
return self._get_arp_cache_nic(ip_address)
|
||||
|
||||
def _process_arp_request(self, arp_packet: ARPPacket, from_nic: NIC):
|
||||
super()._process_arp_request(arp_packet, from_nic)
|
||||
# Unmatched ARP Request
|
||||
if arp_packet.target_ip_address != from_nic.ip_address:
|
||||
self.sys_log.info(
|
||||
f"Ignoring ARP request for {arp_packet.target_ip_address}. Current IP address is {from_nic.ip_address}"
|
||||
)
|
||||
return
|
||||
|
||||
# Matched ARP request
|
||||
self.add_arp_cache_entry(
|
||||
ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
|
||||
)
|
||||
arp_packet = arp_packet.generate_reply(from_nic.mac_address)
|
||||
self.send_arp_reply(arp_packet)
|
||||
@@ -1,98 +1,78 @@
|
||||
# class RouterARPCache(ARPCache):
|
||||
# from ipaddress import IPv4Address
|
||||
# from typing import Optional, Any
|
||||
#
|
||||
# from primaite.simulator.network.hardware.nodes.network.router import RouterInterface, Router
|
||||
# from primaite.simulator.network.protocols.arp import ARPPacket
|
||||
# from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
# from primaite.simulator.system.services.arp.arp import ARP
|
||||
#
|
||||
#
|
||||
# class RouterARP(ARP):
|
||||
# """
|
||||
# Inherits from ARPCache and adds router-specific ARP packet processing.
|
||||
#
|
||||
# :ivar SysLog sys_log: A system log for logging messages.
|
||||
# :ivar Router router: The router to which this ARP cache belongs.
|
||||
# """
|
||||
# router: Router
|
||||
#
|
||||
# def __init__(self, sys_log: SysLog, router: Router):
|
||||
# super().__init__(sys_log)
|
||||
# self.router: Router = router
|
||||
# def get_arp_cache_mac_address(self, ip_address: IPv4Address) -> Optional[str]:
|
||||
# arp_entry = self.arp.get(ip_address)
|
||||
#
|
||||
# def process_arp_packet(
|
||||
# self, from_nic: NIC, frame: Frame, route_table: RouteTable, is_reattempt: bool = False
|
||||
# ) -> None:
|
||||
# """
|
||||
# Processes a received ARP (Address Resolution Protocol) packet in a router-specific way.
|
||||
# if arp_entry:
|
||||
# return arp_entry.mac_address
|
||||
# return None
|
||||
#
|
||||
# This method is responsible for handling both ARP requests and responses. It processes ARP packets received on a
|
||||
# Network Interface Card (NIC) and performs actions based on whether the packet is a request or a reply. This
|
||||
# includes updating the ARP cache, forwarding ARP replies, sending ARP requests for unknown destinations, and
|
||||
# handling packet TTL (Time To Live).
|
||||
# def get_arp_cache_network_interface(self, ip_address: IPv4Address) -> Optional[RouterInterface]:
|
||||
# arp_entry = self.arp.get(ip_address)
|
||||
# if arp_entry:
|
||||
# return self.software_manager.node.network_interfaces[arp_entry.network_interface_uuid]
|
||||
# return None
|
||||
#
|
||||
# The method first checks if the ARP packet is a request or a reply. For ARP replies, it updates the ARP cache
|
||||
# and forwards the reply if necessary. For ARP requests, it checks if the target IP matches one of the router's
|
||||
# NICs and sends an ARP reply if so. If the destination is not directly connected, it consults the routing table
|
||||
# to find the best route and reattempts ARP request processing if needed.
|
||||
#
|
||||
# :param from_nic: The NIC that received the ARP packet.
|
||||
# :param frame: The frame containing the ARP packet.
|
||||
# :param route_table: The routing table of the router.
|
||||
# :param is_reattempt: Flag to indicate if this is a reattempt of processing the ARP packet, defaults to False.
|
||||
# """
|
||||
# arp_packet = frame.arp
|
||||
#
|
||||
# # ARP Reply
|
||||
# if not arp_packet.request:
|
||||
# if arp_packet.target_ip_address == from_nic.ip_address:
|
||||
# # reply to the Router specifically
|
||||
# self.sys_log.info(
|
||||
# f"Received ARP response for {arp_packet.sender_ip_address} "
|
||||
# f"from {arp_packet.sender_mac_addr} via NIC {from_nic}"
|
||||
# )
|
||||
# self.add_arp_cache_entry(
|
||||
# ip_address=arp_packet.sender_ip_address,
|
||||
# mac_address=arp_packet.sender_mac_addr,
|
||||
# nic=from_nic,
|
||||
# )
|
||||
# return
|
||||
#
|
||||
# # # Reply for a connected requested
|
||||
# # nic = self.get_arp_cache_nic(arp_packet.target_ip_address)
|
||||
# # if nic:
|
||||
# # self.sys_log.info(
|
||||
# # f"Forwarding arp reply for {arp_packet.target_ip_address}, from {arp_packet.sender_ip_address}"
|
||||
# # )
|
||||
# # arp_packet.sender_mac_addr = nic.mac_address
|
||||
# # frame.decrement_ttl()
|
||||
# # if frame.ip and frame.ip.ttl < 1:
|
||||
# # self.sys_log.info("Frame discarded as TTL limit reached")
|
||||
# # return
|
||||
# # nic.send_frame(frame)
|
||||
# # return
|
||||
#
|
||||
# # ARP Request
|
||||
# self.sys_log.info(
|
||||
# f"Received ARP request for {arp_packet.target_ip_address} from "
|
||||
# f"{arp_packet.sender_mac_addr}/{arp_packet.sender_ip_address} "
|
||||
# )
|
||||
# # Matched ARP request
|
||||
# def _process_arp_request(self, arp_packet: ARPPacket, from_network_interface: RouterInterface):
|
||||
# super()._process_arp_request(arp_packet, from_network_interface)
|
||||
# self.add_arp_cache_entry(
|
||||
# ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr, nic=from_nic
|
||||
# ip_address=arp_packet.sender_ip_address, mac_address=arp_packet.sender_mac_addr,
|
||||
# network_interface=from_network_interface
|
||||
# )
|
||||
#
|
||||
# # If the target IP matches one of the router's NICs
|
||||
# for nic in self.nics.values():
|
||||
# if nic.enabled and nic.ip_address == arp_packet.target_ip_address:
|
||||
# arp_reply = arp_packet.generate_reply(from_nic.mac_address)
|
||||
# self.send_arp_reply(arp_reply, from_nic)
|
||||
# for network_interface in self.network_interfaces.values():
|
||||
# if network_interface.enabled and network_interface.ip_address == arp_packet.target_ip_address:
|
||||
# arp_reply = arp_packet.generate_reply(from_network_interface.mac_address)
|
||||
# self.send_arp_reply(arp_reply)
|
||||
# return
|
||||
#
|
||||
# # # Check Route Table
|
||||
# # route = route_table.find_best_route(arp_packet.target_ip_address)
|
||||
# # if route and route != self.router.route_table.default_route:
|
||||
# # nic = self.get_arp_cache_nic(route.next_hop_ip_address)
|
||||
# #
|
||||
# # if not nic:
|
||||
# # if not is_reattempt:
|
||||
# # self.send_arp_request(route.next_hop_ip_address, ignore_networks=[frame.ip.src_ip_address])
|
||||
# # return self.process_arp_packet(from_nic, frame, route_table, is_reattempt=True)
|
||||
# # else:
|
||||
# # self.sys_log.info("Ignoring ARP request as destination unavailable/No ARP entry found")
|
||||
# # return
|
||||
# # else:
|
||||
# # arp_reply = arp_packet.generate_reply(from_nic.mac_address)
|
||||
# # self.send_arp_reply(arp_reply, from_nic)
|
||||
# # return
|
||||
# def _process_arp_reply(self, arp_packet: ARPPacket, from_network_interface: RouterInterface):
|
||||
# if arp_packet.target_ip_address == from_network_interface.ip_address:
|
||||
# super()._process_arp_reply(arp_packet, from_network_interface)
|
||||
#
|
||||
# def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
|
||||
# """
|
||||
# Processes received data, handling ARP packets.
|
||||
#
|
||||
# :param payload: The payload received.
|
||||
# :param session_id: The session ID associated with the received data.
|
||||
# :param kwargs: Additional keyword arguments.
|
||||
# :return: True if the payload was processed successfully, otherwise False.
|
||||
# """
|
||||
# if not super().receive(payload, session_id, **kwargs):
|
||||
# return False
|
||||
#
|
||||
# arp_packet: ARPPacket = payload
|
||||
# from_network_interface: RouterInterface = kwargs["from_network_interface"]
|
||||
#
|
||||
# for network_interface in self.network_interfaces.values():
|
||||
# # ARP frame is for this Router
|
||||
# if network_interface.ip_address == arp_packet.target_ip_address:
|
||||
# if payload.request:
|
||||
# self._process_arp_request(arp_packet=arp_packet, from_network_interface=from_network_interface)
|
||||
# else:
|
||||
# self._process_arp_reply(arp_packet=arp_packet, from_network_interface=from_network_interface)
|
||||
# return True
|
||||
#
|
||||
# # ARP frame is not for this router, pass back down to Router to continue routing
|
||||
# frame: Frame = kwargs["frame"]
|
||||
# self.router.process_frame(frame=frame, from_network_interface=from_network_interface)
|
||||
#
|
||||
# return True
|
||||
|
||||
@@ -3,7 +3,6 @@ from ipaddress import IPv4Address
|
||||
from typing import Dict, Any, Union, Optional, Tuple
|
||||
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.network.hardware.base import NIC
|
||||
from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
@@ -53,7 +52,7 @@ class ICMP(Service):
|
||||
return False
|
||||
if target_ip_address.is_loopback:
|
||||
self.sys_log.info("Pinging loopback address")
|
||||
return any(nic.enabled for nic in self.nics.values())
|
||||
return any(network_interface.enabled for network_interface in self.network_interfaces.values())
|
||||
self.sys_log.info(f"Pinging {target_ip_address}:", to_terminal=True)
|
||||
sequence, identifier = 0, None
|
||||
while sequence < pings:
|
||||
@@ -88,9 +87,9 @@ class ICMP(Service):
|
||||
:param pings: The number of pings to send. Defaults to 4.
|
||||
:return: A tuple containing the next sequence number and the identifier.
|
||||
"""
|
||||
nic = self.software_manager.session_manager.resolve_outbound_nic(target_ip_address)
|
||||
network_interface = self.software_manager.session_manager.resolve_outbound_network_interface(target_ip_address)
|
||||
|
||||
if not nic:
|
||||
if not network_interface:
|
||||
self.sys_log.error(
|
||||
"Cannot send ICMP echo request as there is no outbound NIC to use. Try configuring the default gateway."
|
||||
)
|
||||
@@ -118,9 +117,11 @@ class ICMP(Service):
|
||||
"""
|
||||
self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
|
||||
|
||||
nic = self.software_manager.session_manager.resolve_outbound_nic(frame.ip.src_ip_address)
|
||||
network_interface = self.software_manager.session_manager.resolve_outbound_network_interface(
|
||||
frame.ip.src_ip_address
|
||||
)
|
||||
|
||||
if not nic:
|
||||
if not network_interface:
|
||||
self.sys_log.error(
|
||||
"Cannot send ICMP echo reply as there is no outbound NIC to use. Try configuring the default gateway."
|
||||
)
|
||||
|
||||
@@ -16,30 +16,30 @@
|
||||
# super().__init__(sys_log, arp_cache)
|
||||
# self.router = router
|
||||
#
|
||||
# def process_icmp(self, frame: Frame, from_nic: NIC, is_reattempt: bool = False):
|
||||
# def process_icmp(self, frame: Frame, from_network_interface: NIC, is_reattempt: bool = False):
|
||||
# """
|
||||
# Process incoming ICMP frames based on ICMP type.
|
||||
#
|
||||
# :param frame: The incoming frame to process.
|
||||
# :param from_nic: The network interface where the frame is coming from.
|
||||
# :param from_network_interface: The network interface where the frame is coming from.
|
||||
# :param is_reattempt: Flag to indicate if the process is a reattempt.
|
||||
# """
|
||||
# if frame.icmp.icmp_type == ICMPType.ECHO_REQUEST:
|
||||
# # determine if request is for router interface or whether it needs to be routed
|
||||
#
|
||||
# for nic in self.router.nics.values():
|
||||
# if nic.ip_address == frame.ip.dst_ip_address:
|
||||
# if nic.enabled:
|
||||
# for network_interface in self.router.network_interfaces.values():
|
||||
# if network_interface.ip_address == frame.ip.dst_ip_address:
|
||||
# if network_interface.enabled:
|
||||
# # reply to the request
|
||||
# if not is_reattempt:
|
||||
# self.sys_log.info(f"Received echo request from {frame.ip.src_ip_address}")
|
||||
# target_mac_address = self.arp.get_arp_cache_mac_address(frame.ip.src_ip_address)
|
||||
# src_nic = self.arp.get_arp_cache_nic(frame.ip.src_ip_address)
|
||||
# src_nic = self.arp.get_arp_cache_network_interface(frame.ip.src_ip_address)
|
||||
# tcp_header = TCPHeader(src_port=Port.ARP, dst_port=Port.ARP)
|
||||
#
|
||||
# # Network Layer
|
||||
# ip_packet = IPPacket(
|
||||
# src_ip_address=nic.ip_address,
|
||||
# src_ip_address=network_interface.ip_address,
|
||||
# dst_ip_address=frame.ip.src_ip_address,
|
||||
# protocol=IPProtocol.ICMP,
|
||||
# )
|
||||
@@ -67,12 +67,12 @@
|
||||
# return
|
||||
#
|
||||
# # Route the frame
|
||||
# self.router.process_frame(frame, from_nic)
|
||||
# self.router.process_frame(frame, from_network_interface)
|
||||
#
|
||||
# elif frame.icmp.icmp_type == ICMPType.ECHO_REPLY:
|
||||
# for nic in self.router.nics.values():
|
||||
# if nic.ip_address == frame.ip.dst_ip_address:
|
||||
# if nic.enabled:
|
||||
# for network_interface in self.router.network_interfaces.values():
|
||||
# if network_interface.ip_address == frame.ip.dst_ip_address:
|
||||
# if network_interface.enabled:
|
||||
# time = frame.transmission_duration()
|
||||
# time_str = f"{time}ms" if time > 0 else "<1ms"
|
||||
# self.sys_log.info(
|
||||
@@ -87,4 +87,4 @@
|
||||
#
|
||||
# return
|
||||
# # Route the frame
|
||||
# self.router.process_frame(frame, from_nic)
|
||||
# self.router.process_frame(frame, from_network_interface)
|
||||
|
||||
40
src/primaite/utils/validators.py
Normal file
40
src/primaite/utils/validators.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Any, Final
|
||||
|
||||
from pydantic import (
|
||||
BeforeValidator,
|
||||
)
|
||||
from typing_extensions import Annotated
|
||||
|
||||
|
||||
def ipv4_validator(v: Any) -> IPv4Address:
|
||||
"""
|
||||
Validate the input and ensure it can be converted to an IPv4Address instance.
|
||||
|
||||
This function takes an input `v`, and if it's not already an instance of IPv4Address, it tries to convert it to one.
|
||||
If the conversion is successful, the IPv4Address instance is returned. This is useful for ensuring that any input
|
||||
data is strictly in the format of an IPv4 address.
|
||||
|
||||
:param v: The input value that needs to be validated or converted to IPv4Address.
|
||||
:return: An instance of IPv4Address.
|
||||
:raises ValueError: If `v` is not a valid IPv4 address and cannot be converted to an instance of IPv4Address.
|
||||
"""
|
||||
if isinstance(v, IPv4Address):
|
||||
return v
|
||||
|
||||
return IPv4Address(v)
|
||||
|
||||
|
||||
# Define a custom type IPV4Address using the typing_extensions.Annotated.
|
||||
# Annotated is used to attach metadata to type hints. In this case, it's used to associate the ipv4_validator
|
||||
# with the IPv4Address type, ensuring that any usage of IPV4Address undergoes validation before assignment.
|
||||
IPV4Address: Final[Annotated] = Annotated[IPv4Address, BeforeValidator(ipv4_validator)]
|
||||
"""
|
||||
IPv4Address with with pre-validation and auto-conversion from str using ipv4_validator.
|
||||
|
||||
This type is essentially an IPv4Address from the standard library's ipaddress module,
|
||||
but with added validation logic. If you use this custom type, the ipv4_validator function
|
||||
will automatically check and convert the input value to an instance of IPv4Address before
|
||||
any Pydantic model uses it. This ensures that any field marked with this type is not just
|
||||
an IPv4Address in form, but also valid according to the rules defined in ipv4_validator.
|
||||
"""
|
||||
Reference in New Issue
Block a user