Merge remote-tracking branch 'origin/dev' into feature/2417-observation-refactor
This commit is contained in:
@@ -8,6 +8,7 @@ from prettytable import MARKDOWN, PrettyTable
|
||||
from primaite import getLogger
|
||||
from primaite.simulator.core import RequestManager, RequestType, SimComponent
|
||||
from primaite.simulator.network.hardware.base import Link, Node, WiredNetworkInterface
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Printer
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.services.service import Service
|
||||
|
||||
@@ -110,6 +111,16 @@ class Network(SimComponent):
|
||||
"""The Firewalls in the Network."""
|
||||
return [node for node in self.nodes.values() if node.__class__.__name__ == "Firewall"]
|
||||
|
||||
@property
|
||||
def printer_nodes(self) -> List[Node]:
|
||||
"""The printers on the network."""
|
||||
return [node for node in self.nodes.values() if isinstance(node, Printer)]
|
||||
|
||||
@property
|
||||
def wireless_router_nodes(self) -> List[Node]:
|
||||
"""The Routers in the Network."""
|
||||
return [node for node in self.nodes.values() if node.__class__.__name__ == "WirelessRouter"]
|
||||
|
||||
def show(self, nodes: bool = True, ip_addresses: bool = True, links: bool = True, markdown: bool = False):
|
||||
"""
|
||||
Print tables describing the Network.
|
||||
@@ -128,6 +139,8 @@ class Network(SimComponent):
|
||||
"Switch": self.switch_nodes,
|
||||
"Server": self.server_nodes,
|
||||
"Computer": self.computer_nodes,
|
||||
"Printer": self.printer_nodes,
|
||||
"Wireless Router": self.wireless_router_nodes,
|
||||
}
|
||||
if nodes:
|
||||
table = PrettyTable(["Node", "Type", "Operating State"])
|
||||
|
||||
@@ -5,7 +5,7 @@ import secrets
|
||||
from abc import ABC, abstractmethod
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Union
|
||||
from typing import Any, Dict, Optional, Type, TypeVar, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -35,8 +35,11 @@ from primaite.simulator.system.core.software_manager import SoftwareManager
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.processes.process import Process
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from primaite.simulator.system.software import IOSoftware
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
IOSoftwareClass = TypeVar("IOSoftwareClass", bound=IOSoftware)
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
@@ -843,12 +846,58 @@ class Node(SimComponent):
|
||||
)
|
||||
rm.add_request("os", RequestType(func=self._os_request_manager, validator=_node_is_on))
|
||||
|
||||
self._software_request_manager = RequestManager()
|
||||
rm.add_request("software_manager", RequestType(func=self._software_request_manager, validator=_node_is_on))
|
||||
self._application_manager = RequestManager()
|
||||
self._software_request_manager.add_request(
|
||||
name="application", request_type=RequestType(func=self._application_manager)
|
||||
)
|
||||
|
||||
self._application_manager.add_request(
|
||||
name="install",
|
||||
request_type=RequestType(
|
||||
func=lambda request, context: RequestResponse.from_bool(
|
||||
self.application_install_action(
|
||||
application=self._read_application_type(request[0]), ip_address=request[1]
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
self._application_manager.add_request(
|
||||
name="uninstall",
|
||||
request_type=RequestType(
|
||||
func=lambda request, context: RequestResponse.from_bool(
|
||||
self.application_uninstall_action(application=self._read_application_type(request[0]))
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
return rm
|
||||
|
||||
def _install_system_software(self):
|
||||
"""Install System Software - software that is usually provided with the OS."""
|
||||
pass
|
||||
|
||||
def _read_application_type(self, application_class_str: str) -> Type[IOSoftwareClass]:
|
||||
"""Wrapper that converts the string from the request manager into the appropriate class for the application."""
|
||||
if application_class_str == "DoSBot":
|
||||
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
|
||||
|
||||
return DoSBot
|
||||
elif application_class_str == "DataManipulationBot":
|
||||
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
|
||||
DataManipulationBot,
|
||||
)
|
||||
|
||||
return DataManipulationBot
|
||||
elif application_class_str == "WebBrowser":
|
||||
from primaite.simulator.system.applications.web_browser import WebBrowser
|
||||
|
||||
return WebBrowser
|
||||
else:
|
||||
return 0
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -1257,6 +1306,75 @@ class Node(SimComponent):
|
||||
_LOGGER.info(f"Removed application {application.name} from node {self.hostname}")
|
||||
self._application_request_manager.remove_request(application.name)
|
||||
|
||||
def application_install_action(self, application: Application, ip_address: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Install an application on this node and configure it.
|
||||
|
||||
This method is useful for allowing agents to take this action.
|
||||
|
||||
:param application: Application object that has not been installed on any node yet.
|
||||
:type application: Application
|
||||
:param ip_address: IP address used to configure the application
|
||||
(target IP for the DoSBot or server IP for the DataManipulationBot)
|
||||
:type ip_address: str
|
||||
:return: True if the application is installed successfully, otherwise False.
|
||||
"""
|
||||
if application in self:
|
||||
_LOGGER.warning(
|
||||
f"Can't add application {application.__name__}" + f"to node {self.hostname}. It's already installed."
|
||||
)
|
||||
return True
|
||||
|
||||
self.software_manager.install(application)
|
||||
application_instance = self.software_manager.software.get(str(application.__name__))
|
||||
self.applications[application_instance.uuid] = application_instance
|
||||
self.sys_log.info(f"Installed application {application_instance.name}")
|
||||
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
|
||||
self._application_request_manager.add_request(
|
||||
application_instance.name, RequestType(func=application_instance._request_manager)
|
||||
)
|
||||
|
||||
# Configure application if additional parameters are given
|
||||
if ip_address:
|
||||
if application_instance.name == "DoSBot":
|
||||
application_instance.configure(target_ip_address=IPv4Address(ip_address))
|
||||
elif application_instance.name == "DataManipulationBot":
|
||||
application_instance.configure(server_ip_address=IPv4Address(ip_address))
|
||||
else:
|
||||
pass
|
||||
|
||||
if application_instance.name in self.software_manager.software:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def application_uninstall_action(self, application: Application) -> bool:
|
||||
"""
|
||||
Uninstall and completely remove application from this node.
|
||||
|
||||
This method is useful for allowing agents to take this action.
|
||||
|
||||
:param application: Application object that is currently associated with this node.
|
||||
:type application: Application
|
||||
:return: True if the application is uninstalled successfully, otherwise False.
|
||||
"""
|
||||
if application.__name__ not in self.software_manager.software:
|
||||
_LOGGER.warning(
|
||||
f"Can't remove application {application.__name__}" + f"from node {self.hostname}. It's not installed."
|
||||
)
|
||||
return True
|
||||
|
||||
application_instance = self.software_manager.software.get(
|
||||
str(application.__name__)
|
||||
) # This works because we can't have two applications with the same name on the same node
|
||||
# self.uninstall_application(application_instance)
|
||||
self.software_manager.uninstall(application_instance.name)
|
||||
|
||||
if application_instance.name not in self.software_manager.software:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _shut_down_actions(self):
|
||||
"""Actions to perform when the node is shut down."""
|
||||
# Turn off all the services in the node
|
||||
@@ -1288,4 +1406,6 @@ class Node(SimComponent):
|
||||
def __contains__(self, item: Any) -> bool:
|
||||
if isinstance(item, Service):
|
||||
return item.uuid in self.services
|
||||
elif isinstance(item, Application):
|
||||
return item.uuid in self.applications
|
||||
return None
|
||||
|
||||
@@ -28,3 +28,9 @@ class Server(HostNode):
|
||||
* Applications:
|
||||
* Web Browser
|
||||
"""
|
||||
|
||||
|
||||
class Printer(HostNode):
|
||||
"""Printer? I don't even know her!."""
|
||||
|
||||
# TODO: Implement printer-specific behaviour
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Dict, Final, Optional, Union
|
||||
from typing import Dict, Final, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import validate_call
|
||||
from pydantic import Field, validate_call
|
||||
|
||||
from primaite.simulator.core import RequestManager, RequestType
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.network.router import (
|
||||
AccessControlList,
|
||||
@@ -67,22 +68,34 @@ class Firewall(Router):
|
||||
:ivar str hostname: The Firewall hostname.
|
||||
"""
|
||||
|
||||
internal_inbound_acl: Optional[AccessControlList] = None
|
||||
internal_inbound_acl: AccessControlList = Field(
|
||||
default_factory=lambda: AccessControlList(name="Internal Inbound", implicit_action=ACLAction.DENY)
|
||||
)
|
||||
"""Access Control List for managing entering the internal network."""
|
||||
|
||||
internal_outbound_acl: Optional[AccessControlList] = None
|
||||
internal_outbound_acl: AccessControlList = Field(
|
||||
default_factory=lambda: AccessControlList(name="Internal Outbound", implicit_action=ACLAction.DENY)
|
||||
)
|
||||
"""Access Control List for managing traffic leaving the internal network."""
|
||||
|
||||
dmz_inbound_acl: Optional[AccessControlList] = None
|
||||
dmz_inbound_acl: AccessControlList = Field(
|
||||
default_factory=lambda: AccessControlList(name="DMZ Inbound", implicit_action=ACLAction.DENY)
|
||||
)
|
||||
"""Access Control List for managing traffic entering the DMZ."""
|
||||
|
||||
dmz_outbound_acl: Optional[AccessControlList] = None
|
||||
dmz_outbound_acl: AccessControlList = Field(
|
||||
default_factory=lambda: AccessControlList(name="DMZ Outbound", implicit_action=ACLAction.DENY)
|
||||
)
|
||||
"""Access Control List for managing traffic leaving the DMZ."""
|
||||
|
||||
external_inbound_acl: Optional[AccessControlList] = None
|
||||
external_inbound_acl: AccessControlList = Field(
|
||||
default_factory=lambda: AccessControlList(name="External Inbound", implicit_action=ACLAction.PERMIT)
|
||||
)
|
||||
"""Access Control List for managing traffic entering from an external network."""
|
||||
|
||||
external_outbound_acl: Optional[AccessControlList] = None
|
||||
external_outbound_acl: AccessControlList = Field(
|
||||
default_factory=lambda: AccessControlList(name="External Outbound", implicit_action=ACLAction.PERMIT)
|
||||
)
|
||||
"""Access Control List for managing traffic leaving towards an external network."""
|
||||
|
||||
def __init__(self, hostname: str, **kwargs):
|
||||
@@ -100,29 +113,85 @@ class Firewall(Router):
|
||||
self.connect_nic(
|
||||
RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", port_name="dmz")
|
||||
)
|
||||
# Update ACL objects with firewall's hostname and syslog to allow accurate logging
|
||||
self.internal_inbound_acl.sys_log = kwargs["sys_log"]
|
||||
self.internal_inbound_acl.name = f"{hostname} - Internal Inbound"
|
||||
|
||||
# Initialise ACLs for internal and dmz interfaces with a default DENY policy
|
||||
self.internal_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Inbound"
|
||||
self.internal_outbound_acl.sys_log = kwargs["sys_log"]
|
||||
self.internal_outbound_acl.name = f"{hostname} - Internal Outbound"
|
||||
|
||||
self.dmz_inbound_acl.sys_log = kwargs["sys_log"]
|
||||
self.dmz_inbound_acl.name = f"{hostname} - DMZ Inbound"
|
||||
|
||||
self.dmz_outbound_acl.sys_log = kwargs["sys_log"]
|
||||
self.dmz_outbound_acl.name = f"{hostname} - DMZ Outbound"
|
||||
|
||||
self.external_inbound_acl.sys_log = kwargs["sys_log"]
|
||||
self.external_inbound_acl.name = f"{hostname} - External Inbound"
|
||||
|
||||
self.external_outbound_acl.sys_log = kwargs["sys_log"]
|
||||
self.external_outbound_acl.name = f"{hostname} - External Outbound"
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
"""
|
||||
Initialise the request manager.
|
||||
|
||||
More information in user guide and docstring for SimComponent._init_request_manager.
|
||||
"""
|
||||
rm = super()._init_request_manager()
|
||||
self._internal_acl_request_manager = RequestManager()
|
||||
rm.add_request("internal", RequestType(func=self._internal_acl_request_manager))
|
||||
|
||||
self._dmz_acl_request_manager = RequestManager()
|
||||
rm.add_request("dmz", RequestType(func=self._dmz_acl_request_manager))
|
||||
|
||||
self._external_acl_request_manager = RequestManager()
|
||||
rm.add_request("external", RequestType(func=self._external_acl_request_manager))
|
||||
|
||||
self._internal_inbound_acl_request_manager = RequestManager()
|
||||
self._internal_outbound_acl_request_manager = RequestManager()
|
||||
self._internal_acl_request_manager.add_request(
|
||||
"inbound", RequestType(func=self._internal_inbound_acl_request_manager)
|
||||
)
|
||||
self.internal_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Outbound"
|
||||
)
|
||||
self.dmz_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Inbound"
|
||||
)
|
||||
self.dmz_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Outbound"
|
||||
self._internal_acl_request_manager.add_request(
|
||||
"outbound", RequestType(func=self._internal_outbound_acl_request_manager)
|
||||
)
|
||||
|
||||
# external ACLs should have a default PERMIT policy
|
||||
self.external_inbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Inbound"
|
||||
self.dmz_inbound_acl_request_manager = RequestManager()
|
||||
self.dmz_outbound_acl_request_manager = RequestManager()
|
||||
self._dmz_acl_request_manager.add_request("inbound", RequestType(func=self.dmz_inbound_acl_request_manager))
|
||||
self._dmz_acl_request_manager.add_request("outbound", RequestType(func=self.dmz_outbound_acl_request_manager))
|
||||
|
||||
self.external_inbound_acl_request_manager = RequestManager()
|
||||
self.external_outbound_acl_request_manager = RequestManager()
|
||||
self._external_acl_request_manager.add_request(
|
||||
"inbound", RequestType(func=self.external_inbound_acl_request_manager)
|
||||
)
|
||||
self.external_outbound_acl = AccessControlList(
|
||||
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Outbound"
|
||||
self._external_acl_request_manager.add_request(
|
||||
"outbound", RequestType(func=self.external_outbound_acl_request_manager)
|
||||
)
|
||||
|
||||
self._internal_inbound_acl_request_manager.add_request(
|
||||
"acl", RequestType(func=self.internal_inbound_acl._request_manager)
|
||||
)
|
||||
self._internal_outbound_acl_request_manager.add_request(
|
||||
"acl", RequestType(func=self.internal_outbound_acl._request_manager)
|
||||
)
|
||||
|
||||
self.dmz_inbound_acl_request_manager.add_request("acl", RequestType(func=self.dmz_inbound_acl._request_manager))
|
||||
self.dmz_outbound_acl_request_manager.add_request(
|
||||
"acl", RequestType(func=self.dmz_outbound_acl._request_manager)
|
||||
)
|
||||
|
||||
self.external_inbound_acl_request_manager.add_request(
|
||||
"acl", RequestType(func=self.external_inbound_acl._request_manager)
|
||||
)
|
||||
self.external_outbound_acl_request_manager.add_request(
|
||||
"acl", RequestType(func=self.external_outbound_acl._request_manager)
|
||||
)
|
||||
|
||||
return rm
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Describes the current state of the Firewall.
|
||||
|
||||
@@ -277,7 +277,7 @@ class AccessControlList(SimComponent):
|
||||
:ivar int max_acl_rules: The maximum number of ACL rules that can be added to the list. Defaults to 25.
|
||||
"""
|
||||
|
||||
sys_log: SysLog
|
||||
sys_log: Optional[SysLog] = None
|
||||
implicit_action: ACLAction
|
||||
implicit_rule: ACLRule
|
||||
max_acl_rules: int = 25
|
||||
@@ -1420,7 +1420,7 @@ class Router(NetworkNode):
|
||||
:return: Configured router.
|
||||
:rtype: Router
|
||||
"""
|
||||
router = Router(
|
||||
router = cls(
|
||||
hostname=cfg["hostname"],
|
||||
num_ports=int(cfg.get("num_ports", "5")),
|
||||
operating_state=NodeOperatingState.ON
|
||||
@@ -1443,6 +1443,8 @@ class Router(NetworkNode):
|
||||
protocol=None if not (p := r_cfg.get("protocol")) else IPProtocol[p],
|
||||
src_ip_address=r_cfg.get("src_ip"),
|
||||
dst_ip_address=r_cfg.get("dst_ip"),
|
||||
src_wildcard_mask=r_cfg.get("src_wildcard_mask"),
|
||||
dst_wildcard_mask=r_cfg.get("dst_wildcard_mask"),
|
||||
position=r_num,
|
||||
)
|
||||
if "routes" in cfg:
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from pydantic import validate_call
|
||||
|
||||
from primaite.simulator.network.airspace import AirSpaceFrequency, IPWirelessNetworkInterface
|
||||
from primaite.simulator.network.hardware.nodes.network.router import Router, RouterInterface
|
||||
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
|
||||
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
|
||||
from primaite.simulator.network.transmission.data_link_layer import Frame
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.utils.validators import IPV4Address
|
||||
|
||||
|
||||
@@ -209,3 +213,68 @@ class WirelessRouter(Router):
|
||||
raise NotImplementedError(
|
||||
"Please use the 'configure_wireless_access_point' and 'configure_router_interface' functions."
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, cfg: Dict) -> "WirelessRouter":
|
||||
"""Generate the wireless router from config.
|
||||
|
||||
Schema:
|
||||
- hostname (str): unique name for this router.
|
||||
- router_interface (dict): The values should be another dict specifying
|
||||
- ip_address (str)
|
||||
- subnet_mask (str)
|
||||
- wireless_access_point (dict): Dict with
|
||||
- ip address,
|
||||
- subnet mask,
|
||||
- frequency, (string: either WIFI_2_4 or WIFI_5)
|
||||
- acl (dict): Dict with integers from 1 - max_acl_rules as keys. The key defines the position within the ACL
|
||||
where the rule will be added (lower number is resolved first). The values should describe valid ACL
|
||||
Rules as:
|
||||
- action (str): either PERMIT or DENY
|
||||
- src_port (str, optional): the named port such as HTTP, HTTPS, or POSTGRES_SERVER
|
||||
- dst_port (str, optional): the named port such as HTTP, HTTPS, or POSTGRES_SERVER
|
||||
- protocol (str, optional): the named IP protocol such as ICMP, TCP, or UDP
|
||||
- src_ip_address (str, optional): IP address octet written in base 10
|
||||
- dst_ip_address (str, optional): IP address octet written in base 10
|
||||
|
||||
:param cfg: Config dictionary
|
||||
:type cfg: Dict
|
||||
:return: WirelessRouter instance.
|
||||
:rtype: WirelessRouter
|
||||
"""
|
||||
operating_state = (
|
||||
NodeOperatingState.ON if not (p := cfg.get("operating_state")) else NodeOperatingState[p.upper()]
|
||||
)
|
||||
router = cls(hostname=cfg["hostname"], operating_state=operating_state)
|
||||
if "router_interface" in cfg:
|
||||
ip_address = cfg["router_interface"]["ip_address"]
|
||||
subnet_mask = cfg["router_interface"]["subnet_mask"]
|
||||
router.configure_router_interface(ip_address=ip_address, subnet_mask=subnet_mask)
|
||||
if "wireless_access_point" in cfg:
|
||||
ip_address = cfg["wireless_access_point"]["ip_address"]
|
||||
subnet_mask = cfg["wireless_access_point"]["subnet_mask"]
|
||||
frequency = AirSpaceFrequency[cfg["wireless_access_point"]["frequency"]]
|
||||
router.configure_wireless_access_point(ip_address=ip_address, subnet_mask=subnet_mask, frequency=frequency)
|
||||
|
||||
if "acl" in cfg:
|
||||
for r_num, r_cfg in cfg["acl"].items():
|
||||
router.acl.add_rule(
|
||||
action=ACLAction[r_cfg["action"]],
|
||||
src_port=None if not (p := r_cfg.get("src_port")) else Port[p],
|
||||
dst_port=None if not (p := r_cfg.get("dst_port")) else Port[p],
|
||||
protocol=None if not (p := r_cfg.get("protocol")) else IPProtocol[p],
|
||||
src_ip_address=r_cfg.get("src_ip"),
|
||||
dst_ip_address=r_cfg.get("dst_ip"),
|
||||
src_wildcard_mask=r_cfg.get("src_wildcard_mask"),
|
||||
dst_wildcard_mask=r_cfg.get("dst_wildcard_mask"),
|
||||
position=r_num,
|
||||
)
|
||||
if "routes" in cfg:
|
||||
for route in cfg.get("routes"):
|
||||
router.route_table.add_route(
|
||||
address=IPv4Address(route.get("address")),
|
||||
subnet_mask=IPv4Address(route.get("subnet_mask", "255.255.255.0")),
|
||||
next_hop_ip_address=IPv4Address(route.get("next_hop_ip_address")),
|
||||
metric=float(route.get("metric", 0)),
|
||||
)
|
||||
return router
|
||||
|
||||
Reference in New Issue
Block a user