901 - fixed how acls are added into list with new logic - agent cannot overwrite another acl in the list
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
||||
"""A class that implements the access control list implementation for the network."""
|
||||
import logging
|
||||
from typing import Final, List
|
||||
from typing import Final, List, Union
|
||||
|
||||
from primaite.acl.acl_rule import ACLRule
|
||||
from primaite.common.enums import RulePermissionType
|
||||
@@ -22,7 +22,7 @@ class AccessControlList:
|
||||
# Maximum number of ACL Rules in ACL
|
||||
self.max_acl_rules: int = max_acl_rules
|
||||
# A list of ACL Rules
|
||||
self._acl: List[ACLRule] = []
|
||||
self._acl: List[Union[ACLRule, None]] = [None] * (self.max_acl_rules - 1)
|
||||
# Implicit rule
|
||||
self.acl_implicit_rule = None
|
||||
if self.apply_implicit_rule:
|
||||
@@ -80,8 +80,11 @@ class AccessControlList:
|
||||
Indicates block if all conditions are satisfied.
|
||||
"""
|
||||
for rule in self.acl:
|
||||
print("loops through rule", rule, isinstance(rule, ACLRule))
|
||||
if isinstance(rule, ACLRule):
|
||||
print("finds rule")
|
||||
if self.check_address_match(rule, _source_ip_address, _dest_ip_address):
|
||||
print("source and dest ip match")
|
||||
if (rule.get_protocol() == _protocol or rule.get_protocol() == "ANY") and (
|
||||
str(rule.get_port()) == str(_port) or rule.get_port() == "ANY"
|
||||
):
|
||||
@@ -94,7 +97,7 @@ class AccessControlList:
|
||||
# If there has been no rule to allow the IER through, it will return a blocked signal by default
|
||||
return True
|
||||
|
||||
def add_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port, _position=None):
|
||||
def add_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port, _position):
|
||||
"""
|
||||
Adds a new rule.
|
||||
|
||||
@@ -106,29 +109,26 @@ class AccessControlList:
|
||||
_port: the port
|
||||
_position: position to insert ACL rule into ACL list (starting from index 1 and NOT 0)
|
||||
"""
|
||||
position_index = int(_position)
|
||||
try:
|
||||
position_index = int(_position)
|
||||
except TypeError:
|
||||
_LOGGER.info(f"Position {_position} could not be converted to integer.")
|
||||
return
|
||||
|
||||
new_rule = ACLRule(_permission, _source_ip, _dest_ip, _protocol, str(_port))
|
||||
if len(self._acl) + 1 < self.max_acl_rules:
|
||||
if _position is not None:
|
||||
if self.max_acl_rules - 1 > position_index > -1:
|
||||
try:
|
||||
# self._acl.insert(position_index, new_rule)
|
||||
if self._acl[position_index] is None:
|
||||
self.acl[position_index] = new_rule
|
||||
except Exception:
|
||||
_LOGGER.info(f"New Rule could NOT be added to list at position {position_index}.")
|
||||
if self.max_acl_rules - 1 > position_index > -1:
|
||||
try:
|
||||
_LOGGER.info(f"Position {position_index} is valid.")
|
||||
if self._acl[position_index] is None:
|
||||
_LOGGER.info(f"Inserting rule {new_rule} at position {position_index}")
|
||||
self._acl[position_index] = new_rule
|
||||
else:
|
||||
_LOGGER.info(
|
||||
f"Position {position_index} is an invalid index for list/overwrites implicit firewall rule"
|
||||
)
|
||||
else:
|
||||
self.acl.append(new_rule)
|
||||
_LOGGER.info(f"Error: inserting rule at non-empty position {position_index}")
|
||||
return
|
||||
except Exception:
|
||||
_LOGGER.info(f"New Rule could NOT be added to list at position {position_index}.")
|
||||
else:
|
||||
_LOGGER.info(
|
||||
f"The ACL list is FULL."
|
||||
f"The list of ACLs has length {len(self.acl)} and it has a max capacity of {self.max_acl_rules}."
|
||||
)
|
||||
# print("length of this list", len(self._acl))
|
||||
_LOGGER.info(f"Position {position_index} is an invalid/overwrites implicit firewall rule")
|
||||
|
||||
def remove_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port):
|
||||
"""
|
||||
|
||||
@@ -62,6 +62,7 @@ def test_check_acl_block_affirmative():
|
||||
acl_rule_port,
|
||||
acl_position_in_list,
|
||||
)
|
||||
print(len(acl.acl), "len of acl list\n", acl.acl[0])
|
||||
assert acl.is_blocked("192.168.1.1", "192.168.1.2", "TCP", "80") == False
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user