#2402 add firewall acl actions

This commit is contained in:
Cristian-VM2
2024-03-29 16:30:39 +00:00
parent 2eb900746b
commit 7299a12c64
5 changed files with 793 additions and 83 deletions

View File

@@ -458,7 +458,7 @@ class RouterACLAddRuleAction(AbstractAction):
permission_str = "UNUSED"
return ["do_nothing"] # NOT SUPPORTED, JUST DO NOTHING IF WE COME ACROSS THIS
elif permission == 1:
permission_str = "ALLOW"
permission_str = "PERMIT"
elif permission == 2:
permission_str = "DENY"
else:
@@ -540,6 +540,156 @@ class RouterACLRemoveRuleAction(AbstractAction):
return ["network", "node", target_router_nodename, "acl", "remove_rule", position]
class FirewallACLAddRuleAction(AbstractAction):
"""Action which adds a rule to a firewall port's ACL."""
def __init__(
self,
manager: "ActionManager",
max_acl_rules: int,
num_ips: int,
num_ports: int,
num_protocols: int,
**kwargs,
) -> None:
"""Init method for FirewallACLAddRuleAction.
:param manager: Reference to the ActionManager which created this action.
:type manager: ActionManager
:param max_acl_rules: Maximum number of ACL rules that can be added to the router.
:type max_acl_rules: int
:param num_ips: Number of IP addresses in the simulation.
:type num_ips: int
:param num_ports: Number of ports in the simulation.
:type num_ports: int
:param num_protocols: Number of protocols in the simulation.
:type num_protocols: int
"""
super().__init__(manager=manager)
num_permissions = 3
self.shape: Dict[str, int] = {
"position": max_acl_rules,
"permission": num_permissions,
"source_ip_id": num_ips,
"dest_ip_id": num_ips,
"source_port_id": num_ports,
"dest_port_id": num_ports,
"protocol_id": num_protocols,
}
def form_request(
self,
target_firewall_nodename: str,
firewall_port_name: str,
firewall_port_direction: str,
position: int,
permission: int,
source_ip_id: int,
dest_ip_id: int,
source_port_id: int,
dest_port_id: int,
protocol_id: int,
) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
if permission == 0:
permission_str = "UNUSED"
return ["do_nothing"] # NOT SUPPORTED, JUST DO NOTHING IF WE COME ACROSS THIS
elif permission == 1:
permission_str = "PERMIT"
elif permission == 2:
permission_str = "DENY"
else:
_LOGGER.warning(f"{self.__class__} received permission {permission}, expected 0 or 1.")
if protocol_id == 0:
return ["do_nothing"] # NOT SUPPORTED, JUST DO NOTHING IF WE COME ACROSS THIS
if protocol_id == 1:
protocol = "ALL"
else:
protocol = self.manager.get_internet_protocol_by_idx(protocol_id - 2)
# subtract 2 to account for UNUSED=0 and ALL=1.
if source_ip_id == 0:
return ["do_nothing"] # invalid formulation
elif source_ip_id == 1:
src_ip = "ALL"
else:
src_ip = self.manager.get_ip_address_by_idx(source_ip_id - 2)
# subtract 2 to account for UNUSED=0, and ALL=1
if source_port_id == 0:
return ["do_nothing"] # invalid formulation
elif source_port_id == 1:
src_port = "ALL"
else:
src_port = self.manager.get_port_by_idx(source_port_id - 2)
# subtract 2 to account for UNUSED=0, and ALL=1
if source_ip_id == 0:
return ["do_nothing"] # invalid formulation
elif dest_ip_id == 1:
dst_ip = "ALL"
else:
dst_ip = self.manager.get_ip_address_by_idx(dest_ip_id - 2)
# subtract 2 to account for UNUSED=0, and ALL=1
if dest_port_id == 0:
return ["do_nothing"] # invalid formulation
elif dest_port_id == 1:
dst_port = "ALL"
else:
dst_port = self.manager.get_port_by_idx(dest_port_id - 2)
# subtract 2 to account for UNUSED=0, and ALL=1
return [
"network",
"node",
target_firewall_nodename,
firewall_port_name,
firewall_port_direction,
"acl",
"add_rule",
permission_str,
protocol,
str(src_ip),
src_port,
str(dst_ip),
dst_port,
position,
]
class FirewallACLRemoveRuleAction(AbstractAction):
"""Action which removes a rule from a firewall port's ACL."""
def __init__(self, manager: "ActionManager", max_acl_rules: int, **kwargs) -> None:
"""Init method for RouterACLRemoveRuleAction.
:param manager: Reference to the ActionManager which created this action.
:type manager: ActionManager
:param max_acl_rules: Maximum number of ACL rules that can be added to the router.
:type max_acl_rules: int
"""
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"position": max_acl_rules}
def form_request(
self, target_firewall_nodename: str, firewall_port_name: str, firewall_port_direction: str, position: int
) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
return [
"network",
"node",
target_firewall_nodename,
firewall_port_name,
firewall_port_direction,
"acl",
"remove_rule",
position,
]
class NetworkNICAbstractAction(AbstractAction):
"""
Abstract base class for NIC actions.
@@ -668,6 +818,8 @@ class ActionManager:
"NODE_RESET": NodeResetAction,
"ROUTER_ACL_ADDRULE": RouterACLAddRuleAction,
"ROUTER_ACL_REMOVERULE": RouterACLRemoveRuleAction,
"FIREWALL_ACL_ADDRULE": FirewallACLAddRuleAction,
"FIREWALL_ACL_REMOVERULE": FirewallACLRemoveRuleAction,
"NETWORK_NIC_ENABLE": NetworkNICEnableAction,
"NETWORK_NIC_DISABLE": NetworkNICDisableAction,
"NETWORK_PORT_ENABLE": NetworkPortEnableAction,

View File

@@ -1,10 +1,10 @@
from ipaddress import IPv4Address
from typing import Dict, Final, Optional, Union
from typing import Dict, Final, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import validate_call
from pydantic import Field, validate_call
# from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import (
AccessControlList,
@@ -68,22 +68,34 @@ class Firewall(Router):
:ivar str hostname: The Firewall hostname.
"""
internal_inbound_acl: Optional[AccessControlList] = None
internal_inbound_acl: AccessControlList = Field(
default_factory=lambda: AccessControlList(name="Internal Inbound", implicit_action=ACLAction.DENY)
)
"""Access Control List for managing entering the internal network."""
internal_outbound_acl: Optional[AccessControlList] = None
internal_outbound_acl: AccessControlList = Field(
default_factory=lambda: AccessControlList(name="Internal Outbound", implicit_action=ACLAction.DENY)
)
"""Access Control List for managing traffic leaving the internal network."""
dmz_inbound_acl: Optional[AccessControlList] = None
dmz_inbound_acl: AccessControlList = Field(
default_factory=lambda: AccessControlList(name="DMZ Inbound", implicit_action=ACLAction.DENY)
)
"""Access Control List for managing traffic entering the DMZ."""
dmz_outbound_acl: Optional[AccessControlList] = None
dmz_outbound_acl: AccessControlList = Field(
default_factory=lambda: AccessControlList(name="DMZ Outbound", implicit_action=ACLAction.DENY)
)
"""Access Control List for managing traffic leaving the DMZ."""
external_inbound_acl: Optional[AccessControlList] = None
external_inbound_acl: AccessControlList = Field(
default_factory=lambda: AccessControlList(name="External Inbound", implicit_action=ACLAction.PERMIT)
)
"""Access Control List for managing traffic entering from an external network."""
external_outbound_acl: Optional[AccessControlList] = None
external_outbound_acl: AccessControlList = Field(
default_factory=lambda: AccessControlList(name="External Outbound", implicit_action=ACLAction.PERMIT)
)
"""Access Control List for managing traffic leaving towards an external network."""
def __init__(self, hostname: str, **kwargs):
@@ -101,88 +113,84 @@ class Firewall(Router):
self.connect_nic(
RouterInterface(ip_address="127.0.0.1", subnet_mask="255.0.0.0", gateway="0.0.0.0", port_name="dmz")
)
# Initialise ACLs for internal and dmz interfaces with a default DENY policy
self.internal_inbound_acl = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Inbound"
self.internal_inbound_acl.sys_log = kwargs["sys_log"]
self.internal_inbound_acl.name = f"{hostname} - Internal Inbound"
self.internal_outbound_acl.sys_log = kwargs["sys_log"]
self.internal_outbound_acl.name = f"{hostname} - Internal Outbound"
self.dmz_inbound_acl.sys_log = kwargs["sys_log"]
self.dmz_inbound_acl.name = f"{hostname} - DMZ Inbound"
self.dmz_outbound_acl.sys_log = kwargs["sys_log"]
self.dmz_outbound_acl.name = f"{hostname} - DMZ Outbound"
self.external_inbound_acl.sys_log = kwargs["sys_log"]
self.external_inbound_acl.name = f"{hostname} - External Inbound"
self.external_outbound_acl.sys_log = kwargs["sys_log"]
self.external_outbound_acl.name = f"{hostname} - External Outbound"
def _init_request_manager(self) -> RequestManager:
"""
Initialise the request manager.
More information in user guide and docstring for SimComponent._init_request_manager.
"""
rm = super()._init_request_manager()
self._internal_acl_request_manager = RequestManager()
rm.add_request("internal", RequestType(func=self._internal_acl_request_manager))
self._dmz_acl_request_manager = RequestManager()
rm.add_request("dmz", RequestType(func=self._dmz_acl_request_manager))
self._external_acl_request_manager = RequestManager()
rm.add_request("external", RequestType(func=self._external_acl_request_manager))
self._internal_inbound_acl_request_manager = RequestManager()
self._internal_outbound_acl_request_manager = RequestManager()
self._internal_acl_request_manager.add_request(
"inbound", RequestType(func=self._internal_inbound_acl_request_manager)
)
self.internal_outbound_acl = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - Internal Outbound"
)
self.dmz_inbound_acl = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Inbound"
)
self.dmz_outbound_acl = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.DENY, name=f"{hostname} - DMZ Outbound"
self._internal_acl_request_manager.add_request(
"outbound", RequestType(func=self._internal_outbound_acl_request_manager)
)
# external ACLs should have a default PERMIT policy
self.external_inbound_acl = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Inbound"
self.dmz_inbound_acl_request_manager = RequestManager()
self.dmz_outbound_acl_request_manager = RequestManager()
self._dmz_acl_request_manager.add_request("inbound", RequestType(func=self.dmz_inbound_acl_request_manager))
self._dmz_acl_request_manager.add_request("outbound", RequestType(func=self.dmz_outbound_acl_request_manager))
self.external_inbound_acl_request_manager = RequestManager()
self.external_outbound_acl_request_manager = RequestManager()
self._external_acl_request_manager.add_request(
"inbound", RequestType(func=self.external_inbound_acl_request_manager)
)
self.external_outbound_acl = AccessControlList(
sys_log=kwargs["sys_log"], implicit_action=ACLAction.PERMIT, name=f"{hostname} - External Outbound"
self._external_acl_request_manager.add_request(
"outbound", RequestType(func=self.external_outbound_acl_request_manager)
)
# def _init_request_manager(self) -> RequestManager:
# """
# Initialise the request manager.
self._internal_inbound_acl_request_manager.add_request(
"acl", RequestType(func=self.internal_inbound_acl._request_manager)
)
self._internal_outbound_acl_request_manager.add_request(
"acl", RequestType(func=self.internal_outbound_acl._request_manager)
)
# More information in user guide and docstring for SimComponent._init_request_manager.
# """
# rm = super()._init_request_manager()
# self._internal_acl_request_manager = RequestManager()
# rm.add_request("internal", RequestType(func=self._internal_acl_request_manager))
self.dmz_inbound_acl_request_manager.add_request("acl", RequestType(func=self.dmz_inbound_acl._request_manager))
self.dmz_outbound_acl_request_manager.add_request(
"acl", RequestType(func=self.dmz_outbound_acl._request_manager)
)
# self._dmz_acl_request_manager = RequestManager()
# rm.add_request("dmz", RequestType(func=self._dmz_acl_request_manager))
self.external_inbound_acl_request_manager.add_request(
"acl", RequestType(func=self.external_inbound_acl._request_manager)
)
self.external_outbound_acl_request_manager.add_request(
"acl", RequestType(func=self.external_outbound_acl._request_manager)
)
# self._external_acl_request_manager = RequestManager()
# rm.add_request("external", RequestType(func=self._external_acl_request_manager))
# self._internal_inbound_acl_request_manager = RequestManager()
# self._internal_outbound_acl_request_manager = RequestManager()
# self._internal_acl_request_manager.add_request(
# "inbound", RequestType(func=self._internal_inbound_acl_request_manager)
# )
# self._internal_acl_request_manager.add_request(
# "outbound", RequestType(func=self._internal_outbound_acl_request_manager)
# )
# self.dmz_inbound_acl_request_manager = RequestManager()
# self.dmz_outbound_acl_request_manager = RequestManager()
# self._dmz_acl_request_manager.add_request("inbound", RequestType(func=self.dmz_inbound_acl_request_manager))
# self._dmz_acl_request_manager.add_request("outbound", RequestType(func=self.dmz_outbound_acl_request_manager))
# self.external_inbound_acl_request_manager = RequestManager()
# self.external_outbound_acl_request_manager = RequestManager()
# self._external_acl_request_manager.add_request(
# "inbound", RequestType(func=self.external_inbound_acl_request_manager)
# )
# self._external_acl_request_manager.add_request(
# "outbound", RequestType(func=self.external_outbound_acl_request_manager)
# )
# self._internal_inbound_acl_request_manager.add_request(
# "acl", RequestType(func=self.internal_inbound_acl._request_manager)
# )
# self._internal_outbound_acl_request_manager.add_request(
# "acl", RequestType(func=self.internal_outbound_acl._request_manager)
# )
# self.dmz_inbound_acl_request_manager.add_request("acl", RequestType(func=self.dmz_inbound_acl._request_manager))
# self.dmz_outbound_acl_request_manager.add_request(
# "acl", RequestType(func=self.dmz_outbound_acl._request_manager)
# )
# self.external_inbound_acl_request_manager.add_request(
# "acl", RequestType(func=self.external_inbound_acl._request_manager)
# )
# self.external_outbound_acl_request_manager.add_request(
# "acl", RequestType(func=self.external_outbound_acl._request_manager)
# )
# return rm
return rm
def describe_state(self) -> Dict:
"""

View File

@@ -275,7 +275,7 @@ class AccessControlList(SimComponent):
:ivar int max_acl_rules: The maximum number of ACL rules that can be added to the list. Defaults to 25.
"""
sys_log: SysLog
sys_log: Optional[SysLog] = None
implicit_action: ACLAction
implicit_rule: ACLRule
max_acl_rules: int = 25

View File

@@ -0,0 +1,448 @@
# Network with DMZ
#
# An example network configuration with an internal network, a DMZ network and a couple of external networks.
#
# ............................................................................
# . .
# . Internal Network .
# . .
# . -------------- -------------- -------------- .
# . | client_1 |------| switch_1 |--------| router_1 | .
# . -------------- -------------- -------------- .
# . (Computer) | .
# ........................................................|...................
# |
# |
# ........................................................|...................
# . | .
# . DMZ Network | .
# . | .
# . ---------------- -------------- -------------- .
# . | dmz_server |------| switch_2 |------| firewall | .
# . ---------------- -------------- -------------- .
# . (Server) | .
# ........................................................|...................
# |
# External Network |
# |
# |
# ----------------------- -------------- ---------------------
# | external_computer |------| switch_3 |------| external_server |
# ----------------------- -------------- ---------------------
#
training_config:
rl_framework: SB3
rl_algorithm: PPO
seed: 333
n_learn_episodes: 1
n_eval_episodes: 5
max_steps_per_episode: 128
deterministic_eval: false
n_agents: 1
agent_references:
- defender
io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
game:
max_episode_length: 256
ports:
- ARP
- DNS
- HTTP
- POSTGRES_SERVER
protocols:
- ICMP
- TCP
- UDP
agents:
- ref: defender
team: BLUE
type: ProxyAgent
observation_space:
type: UC2BlueObservation
options:
num_services_per_node: 1
num_folders_per_node: 1
num_files_per_folder: 1
num_nics_per_node: 2
nodes:
- node_hostname: client_1
links:
- link_ref: client_1___switch_1
acl:
options:
max_acl_rules: 10
router_hostname: router_1
ip_address_order:
- node_hostname: client_1
nic_num: 1
ics: null
action_space:
action_list:
- type: DONOTHING
- type: FIREWALL_ACL_ADDRULE
- type: FIREWALL_ACL_REMOVERULE
action_map:
0:
action: DONOTHING
options: {}
1:
action: FIREWALL_ACL_ADDRULE
options:
target_firewall_nodename: firewall
firewall_port_name: internal
firewall_port_direction: inbound
position: 1
permission: 1
source_ip_id: 2 # client 1
dest_ip_id: 1 # ALL
source_port_id: 1
dest_port_id: 1
protocol_id: 1
2:
action: FIREWALL_ACL_REMOVERULE
options:
target_firewall_nodename: firewall
firewall_port_name: internal
firewall_port_direction: inbound
position: 1
3:
action: FIREWALL_ACL_ADDRULE
options:
target_firewall_nodename: firewall
firewall_port_name: internal
firewall_port_direction: outbound
position: 1
permission: 2
source_ip_id: 2 # client 1
dest_ip_id: 1 # ALL
source_port_id: 2
dest_port_id: 3
protocol_id: 2
4:
action: FIREWALL_ACL_REMOVERULE
options:
target_firewall_nodename: firewall
firewall_port_name: internal
firewall_port_direction: outbound
position: 1
5:
action: FIREWALL_ACL_ADDRULE
options:
target_firewall_nodename: firewall
firewall_port_name: dmz
firewall_port_direction: inbound
position: 1
permission: 2
source_ip_id: 3 # dmz_server
dest_ip_id: 2 # client_1
source_port_id: 4
dest_port_id: 4
protocol_id: 4
6:
action: FIREWALL_ACL_REMOVERULE
options:
target_firewall_nodename: firewall
firewall_port_name: dmz
firewall_port_direction: inbound
position: 1
7:
action: FIREWALL_ACL_ADDRULE
options:
target_firewall_nodename: firewall
firewall_port_name: dmz
firewall_port_direction: outbound
position: 2
permission: 2
source_ip_id: 3 # dmz_server
dest_ip_id: 2 # client_1
source_port_id: 4
dest_port_id: 4
protocol_id: 3
8:
action: FIREWALL_ACL_REMOVERULE
options:
target_firewall_nodename: firewall
firewall_port_name: dmz
firewall_port_direction: outbound
position: 2
9:
action: FIREWALL_ACL_ADDRULE
options:
target_firewall_nodename: firewall
firewall_port_name: external
firewall_port_direction: inbound
position: 10
permission: 2
source_ip_id: 4 # external_computer
dest_ip_id: 3 # dmz
source_port_id: 5
dest_port_id: 5
protocol_id: 2
10:
action: FIREWALL_ACL_REMOVERULE
options:
target_firewall_nodename: firewall
firewall_port_name: external
firewall_port_direction: inbound
position: 10
11:
action: FIREWALL_ACL_ADDRULE
options:
target_firewall_nodename: firewall
firewall_port_name: external
firewall_port_direction: outbound
position: 1
permission: 2
source_ip_id: 4 # external_computer
dest_ip_id: 2 # client_1
source_port_id: 1
dest_port_id: 1
protocol_id: 1
12:
action: FIREWALL_ACL_REMOVERULE
options:
target_firewall_nodename: firewall
firewall_port_name: external
firewall_port_direction: outbound
position: 1
options:
nodes:
- node_name: client_1
- node_name: dmz_server
- node_name: external_computer
ip_address_order:
- node_name: client_1
nic_num: 1
- node_name: dmz_server
nic_num: 1
- node_name: external_computer
nic_num: 1
max_folders_per_node: 2
max_files_per_folder: 2
max_services_per_node: 2
max_nics_per_node: 8
max_acl_rules: 10
reward_function:
reward_components:
- type: DUMMY
agent_settings:
start_settings:
start_step: 5
frequency: 4
variance: 3
simulation:
network:
nodes:
- ref: client_1
type: computer
hostname: client_1
ip_address: 192.168.0.10
subnet_mask: 255.255.255.0
default_gateway: 192.168.0.1
dns_server: 192.168.20.11
start_up_duration: 0
shut_down_duration: 0
- ref: switch_1
type: switch
hostname: switch_1
num_ports: 8
start_up_duration: 0
shut_down_duration: 0
- ref: router_1
type: router
hostname: router_1
num_ports: 5
start_up_duration: 0
shut_down_duration: 0
ports:
1:
ip_address: 192.168.0.1
subnet_mask: 255.255.255.0
2:
ip_address: 192.168.1.1
subnet_mask: 255.255.255.0
acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
23:
action: PERMIT
protocol: ICMP
routes:
- address: 192.168.10.10 # route to dmz_server
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.2
metric: 0
- address: 192.168.20.10 # route to external_computer
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.2
metric: 0
- address: 192.168.20.11 # route to external_server
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.2
metric: 0
- ref: dmz_server
type: server
hostname: dmz_server
ip_address: 192.168.10.10
subnet_mask: 255.255.255.0
default_gateway: 192.168.10.1
dns_server: 192.168.20.11
start_up_duration: 0
shut_down_duration: 0
- ref: switch_2
type: switch
hostname: switch_2
num_ports: 8
start_up_duration: 0
shut_down_duration: 0
- ref: firewall
type: firewall
hostname: firewall
start_up_duration: 0
shut_down_duration: 0
ports:
external_port: # port 1
ip_address: 192.168.20.1
subnet_mask: 255.255.255.0
internal_port: # port 2
ip_address: 192.168.1.2
subnet_mask: 255.255.255.0
dmz_port: # port 3
ip_address: 192.168.10.1
subnet_mask: 255.255.255.0
acl:
internal_inbound_acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
23:
action: PERMIT
protocol: ICMP
internal_outbound_acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
23:
action: PERMIT
protocol: ICMP
dmz_inbound_acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
23:
action: PERMIT
protocol: ICMP
dmz_outbound_acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
23:
action: PERMIT
protocol: ICMP
external_inbound_acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
external_outbound_acl:
22:
action: PERMIT
src_port: ARP
dst_port: ARP
routes:
- address: 192.168.0.10 # route to client_1
subnet_mask: 255.255.255.0
next_hop_ip_address: 192.168.1.1
metric: 0
- ref: switch_3
type: switch
hostname: switch_3
num_ports: 8
start_up_duration: 0
shut_down_duration: 0
- ref: external_computer
type: computer
hostname: external_computer
ip_address: 192.168.20.10
subnet_mask: 255.255.255.0
default_gateway: 192.168.20.1
dns_server: 192.168.20.11
start_up_duration: 0
shut_down_duration: 0
- ref: external_server
type: server
hostname: external_server
ip_address: 192.168.20.11
subnet_mask: 255.255.255.0
default_gateway: 192.168.20.1
start_up_duration: 0
shut_down_duration: 0
services:
- ref: domain_controller_dns_server
type: DNSServer
links:
- ref: client_1___switch_1
endpoint_a_ref: client_1
endpoint_a_port: 1
endpoint_b_ref: switch_1
endpoint_b_port: 1
- ref: router_1___switch_1
endpoint_a_ref: router_1
endpoint_a_port: 1
endpoint_b_ref: switch_1
endpoint_b_port: 8
- ref: router_1___firewall
endpoint_a_ref: firewall
endpoint_a_port: 2 # internal firewall port
endpoint_b_ref: router_1
endpoint_b_port: 2
- ref: firewall___switch_2
endpoint_a_ref: firewall
endpoint_a_port: 3 # dmz firewall port
endpoint_b_ref: switch_2
endpoint_b_port: 8
- ref: dmz_server___switch_2
endpoint_a_ref: dmz_server
endpoint_a_port: 1
endpoint_b_ref: switch_2
endpoint_b_port: 1
- ref: firewall___switch_3
endpoint_a_ref: firewall
endpoint_a_port: 1 # external firewall port
endpoint_b_ref: switch_3
endpoint_b_port: 8
- ref: external_computer___switch_3
endpoint_a_ref: external_computer
endpoint_a_port: 1
endpoint_b_ref: switch_3
endpoint_b_port: 1
- ref: external_server___switch_3
endpoint_a_ref: external_server
endpoint_a_port: 1
endpoint_b_ref: switch_3
endpoint_b_port: 2

View File

@@ -10,16 +10,24 @@
# 4. Check that the simulation has changed in the way that I expect.
# 5. Repeat for all actions.
from ipaddress import IPv4Address
from typing import Tuple
import pytest
import yaml
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.transmission.network_layer import IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.software import SoftwareHealthState
from tests import TEST_ASSETS_ROOT
FIREWALL_ACTIONS_NETWORK = TEST_ASSETS_ROOT / "configs/firewall_actions_network.yaml"
def test_do_nothing_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
@@ -458,3 +466,97 @@ def test_node_application_close_integration(game_and_agent: Tuple[PrimaiteGame,
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
def test_firewall_acl_add_remove_rule_integration():
"""
Test that FirewallACLAddRuleAction and FirewallACLRemoveRuleAction can form a request and that it is accepted by the simulation.
Check that all the details of the ACL rules are correctly added to each ACL list of the Firewall.
Check that rules are removed as expected.
"""
with open(FIREWALL_ACTIONS_NETWORK, "r") as f:
cfg = yaml.safe_load(f)
env = PrimaiteGymEnv(game_config=cfg)
# 1: Check that traffic is normal and acl starts off with 4 rules.
firewall = env.game.simulation.network.get_node_by_hostname("firewall")
assert firewall.internal_inbound_acl.num_rules == 2
assert firewall.internal_outbound_acl.num_rules == 2
assert firewall.dmz_inbound_acl.num_rules == 2
assert firewall.dmz_outbound_acl.num_rules == 2
assert firewall.external_inbound_acl.num_rules == 1
assert firewall.external_outbound_acl.num_rules == 1
env.step(1) # Add ACL rule to Internal Inbound
assert firewall.internal_inbound_acl.num_rules == 3
assert firewall.internal_inbound_acl.acl[1].action.name == "PERMIT"
assert firewall.internal_inbound_acl.acl[1].src_ip_address == IPv4Address("192.168.0.10")
assert firewall.internal_inbound_acl.acl[1].dst_ip_address is None
assert firewall.internal_inbound_acl.acl[1].dst_port is None
assert firewall.internal_inbound_acl.acl[1].src_port is None
assert firewall.internal_inbound_acl.acl[1].protocol is None
env.step(2) # Remove ACL rule from Internal Inbound
assert firewall.internal_inbound_acl.num_rules == 2
env.step(3) # Add ACL rule to Internal Outbound
assert firewall.internal_outbound_acl.num_rules == 3
assert firewall.internal_outbound_acl.acl[1].action.name == "DENY"
assert firewall.internal_outbound_acl.acl[1].src_ip_address == IPv4Address("192.168.0.10")
assert firewall.internal_outbound_acl.acl[1].dst_ip_address is None
assert firewall.internal_outbound_acl.acl[1].dst_port == Port.DNS
assert firewall.internal_outbound_acl.acl[1].src_port == Port.ARP
assert firewall.internal_outbound_acl.acl[1].protocol == IPProtocol.ICMP
env.step(4) # Remove ACL rule from Internal Outbound
assert firewall.internal_outbound_acl.num_rules == 2
env.step(5) # Add ACL rule to DMZ Inbound
assert firewall.dmz_inbound_acl.num_rules == 3
assert firewall.dmz_inbound_acl.acl[1].action.name == "DENY"
assert firewall.dmz_inbound_acl.acl[1].src_ip_address == IPv4Address("192.168.10.10")
assert firewall.dmz_inbound_acl.acl[1].dst_ip_address == IPv4Address("192.168.0.10")
assert firewall.dmz_inbound_acl.acl[1].dst_port == Port.HTTP
assert firewall.dmz_inbound_acl.acl[1].src_port == Port.HTTP
assert firewall.dmz_inbound_acl.acl[1].protocol == IPProtocol.UDP
env.step(6) # Remove ACL rule from DMZ Inbound
assert firewall.dmz_inbound_acl.num_rules == 2
env.step(7) # Add ACL rule to DMZ Outbound
assert firewall.dmz_outbound_acl.num_rules == 3
assert firewall.dmz_outbound_acl.acl[2].action.name == "DENY"
assert firewall.dmz_outbound_acl.acl[2].src_ip_address == IPv4Address("192.168.10.10")
assert firewall.dmz_outbound_acl.acl[2].dst_ip_address == IPv4Address("192.168.0.10")
assert firewall.dmz_outbound_acl.acl[2].dst_port == Port.HTTP
assert firewall.dmz_outbound_acl.acl[2].src_port == Port.HTTP
assert firewall.dmz_outbound_acl.acl[2].protocol == IPProtocol.TCP
env.step(8) # Remove ACL rule from DMZ Outbound
assert firewall.dmz_outbound_acl.num_rules == 2
env.step(9) # Add ACL rule to External Inbound
assert firewall.external_inbound_acl.num_rules == 2
assert firewall.external_inbound_acl.acl[10].action.name == "DENY"
assert firewall.external_inbound_acl.acl[10].src_ip_address == IPv4Address("192.168.20.10")
assert firewall.external_inbound_acl.acl[10].dst_ip_address == IPv4Address("192.168.10.10")
assert firewall.external_inbound_acl.acl[10].dst_port == Port.POSTGRES_SERVER
assert firewall.external_inbound_acl.acl[10].src_port == Port.POSTGRES_SERVER
assert firewall.external_inbound_acl.acl[10].protocol == IPProtocol.ICMP
env.step(10) # Remove ACL rule from External Inbound
assert firewall.external_inbound_acl.num_rules == 1
env.step(11) # Add ACL rule to External Outbound
assert firewall.external_outbound_acl.num_rules == 2
assert firewall.external_outbound_acl.acl[1].action.name == "DENY"
assert firewall.external_outbound_acl.acl[1].src_ip_address == IPv4Address("192.168.20.10")
assert firewall.external_outbound_acl.acl[1].dst_ip_address == IPv4Address("192.168.0.10")
assert firewall.external_outbound_acl.acl[1].dst_port is None
assert firewall.external_outbound_acl.acl[1].src_port is None
assert firewall.external_outbound_acl.acl[1].protocol is None
env.step(12) # Remove ACL rule from External Outbound
assert firewall.external_outbound_acl.num_rules == 1