901 - fixed how acls are added into list with new logic - agent cannot overwrite another acl in the list
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
||||||
"""A class that implements the access control list implementation for the network."""
|
"""A class that implements the access control list implementation for the network."""
|
||||||
import logging
|
import logging
|
||||||
from typing import Final, List
|
from typing import Final, List, Union
|
||||||
|
|
||||||
from primaite.acl.acl_rule import ACLRule
|
from primaite.acl.acl_rule import ACLRule
|
||||||
from primaite.common.enums import RulePermissionType
|
from primaite.common.enums import RulePermissionType
|
||||||
@@ -22,7 +22,7 @@ class AccessControlList:
|
|||||||
# Maximum number of ACL Rules in ACL
|
# Maximum number of ACL Rules in ACL
|
||||||
self.max_acl_rules: int = max_acl_rules
|
self.max_acl_rules: int = max_acl_rules
|
||||||
# A list of ACL Rules
|
# A list of ACL Rules
|
||||||
self._acl: List[ACLRule] = []
|
self._acl: List[Union[ACLRule, None]] = [None] * (self.max_acl_rules - 1)
|
||||||
# Implicit rule
|
# Implicit rule
|
||||||
self.acl_implicit_rule = None
|
self.acl_implicit_rule = None
|
||||||
if self.apply_implicit_rule:
|
if self.apply_implicit_rule:
|
||||||
@@ -80,8 +80,11 @@ class AccessControlList:
|
|||||||
Indicates block if all conditions are satisfied.
|
Indicates block if all conditions are satisfied.
|
||||||
"""
|
"""
|
||||||
for rule in self.acl:
|
for rule in self.acl:
|
||||||
|
print("loops through rule", rule, isinstance(rule, ACLRule))
|
||||||
if isinstance(rule, ACLRule):
|
if isinstance(rule, ACLRule):
|
||||||
|
print("finds rule")
|
||||||
if self.check_address_match(rule, _source_ip_address, _dest_ip_address):
|
if self.check_address_match(rule, _source_ip_address, _dest_ip_address):
|
||||||
|
print("source and dest ip match")
|
||||||
if (rule.get_protocol() == _protocol or rule.get_protocol() == "ANY") and (
|
if (rule.get_protocol() == _protocol or rule.get_protocol() == "ANY") and (
|
||||||
str(rule.get_port()) == str(_port) or rule.get_port() == "ANY"
|
str(rule.get_port()) == str(_port) or rule.get_port() == "ANY"
|
||||||
):
|
):
|
||||||
@@ -94,7 +97,7 @@ class AccessControlList:
|
|||||||
# If there has been no rule to allow the IER through, it will return a blocked signal by default
|
# If there has been no rule to allow the IER through, it will return a blocked signal by default
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def add_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port, _position=None):
|
def add_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port, _position):
|
||||||
"""
|
"""
|
||||||
Adds a new rule.
|
Adds a new rule.
|
||||||
|
|
||||||
@@ -106,29 +109,26 @@ class AccessControlList:
|
|||||||
_port: the port
|
_port: the port
|
||||||
_position: position to insert ACL rule into ACL list (starting from index 1 and NOT 0)
|
_position: position to insert ACL rule into ACL list (starting from index 1 and NOT 0)
|
||||||
"""
|
"""
|
||||||
position_index = int(_position)
|
try:
|
||||||
|
position_index = int(_position)
|
||||||
|
except TypeError:
|
||||||
|
_LOGGER.info(f"Position {_position} could not be converted to integer.")
|
||||||
|
return
|
||||||
|
|
||||||
new_rule = ACLRule(_permission, _source_ip, _dest_ip, _protocol, str(_port))
|
new_rule = ACLRule(_permission, _source_ip, _dest_ip, _protocol, str(_port))
|
||||||
if len(self._acl) + 1 < self.max_acl_rules:
|
if self.max_acl_rules - 1 > position_index > -1:
|
||||||
if _position is not None:
|
try:
|
||||||
if self.max_acl_rules - 1 > position_index > -1:
|
_LOGGER.info(f"Position {position_index} is valid.")
|
||||||
try:
|
if self._acl[position_index] is None:
|
||||||
# self._acl.insert(position_index, new_rule)
|
_LOGGER.info(f"Inserting rule {new_rule} at position {position_index}")
|
||||||
if self._acl[position_index] is None:
|
self._acl[position_index] = new_rule
|
||||||
self.acl[position_index] = new_rule
|
|
||||||
except Exception:
|
|
||||||
_LOGGER.info(f"New Rule could NOT be added to list at position {position_index}.")
|
|
||||||
else:
|
else:
|
||||||
_LOGGER.info(
|
_LOGGER.info(f"Error: inserting rule at non-empty position {position_index}")
|
||||||
f"Position {position_index} is an invalid index for list/overwrites implicit firewall rule"
|
return
|
||||||
)
|
except Exception:
|
||||||
else:
|
_LOGGER.info(f"New Rule could NOT be added to list at position {position_index}.")
|
||||||
self.acl.append(new_rule)
|
|
||||||
else:
|
else:
|
||||||
_LOGGER.info(
|
_LOGGER.info(f"Position {position_index} is an invalid/overwrites implicit firewall rule")
|
||||||
f"The ACL list is FULL."
|
|
||||||
f"The list of ACLs has length {len(self.acl)} and it has a max capacity of {self.max_acl_rules}."
|
|
||||||
)
|
|
||||||
# print("length of this list", len(self._acl))
|
|
||||||
|
|
||||||
def remove_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port):
|
def remove_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ def test_check_acl_block_affirmative():
|
|||||||
acl_rule_port,
|
acl_rule_port,
|
||||||
acl_position_in_list,
|
acl_position_in_list,
|
||||||
)
|
)
|
||||||
|
print(len(acl.acl), "len of acl list\n", acl.acl[0])
|
||||||
assert acl.is_blocked("192.168.1.1", "192.168.1.2", "TCP", "80") == False
|
assert acl.is_blocked("192.168.1.1", "192.168.1.2", "TCP", "80") == False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user