From ae6c90a6701ca732a147c5b1e2cb249329ca7d71 Mon Sep 17 00:00:00 2001 From: SunilSamra Date: Wed, 12 Jul 2023 09:47:16 +0100 Subject: [PATCH] 901 - fixed how acls are added into list with new logic - agent cannot overwrite another acl in the list --- src/primaite/acl/access_control_list.py | 46 ++++++++++++------------- tests/test_acl.py | 1 + 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/primaite/acl/access_control_list.py b/src/primaite/acl/access_control_list.py index 539af83f..7c6184ca 100644 --- a/src/primaite/acl/access_control_list.py +++ b/src/primaite/acl/access_control_list.py @@ -1,7 +1,7 @@ # Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence. """A class that implements the access control list implementation for the network.""" import logging -from typing import Final, List +from typing import Final, List, Union from primaite.acl.acl_rule import ACLRule from primaite.common.enums import RulePermissionType @@ -22,7 +22,7 @@ class AccessControlList: # Maximum number of ACL Rules in ACL self.max_acl_rules: int = max_acl_rules # A list of ACL Rules - self._acl: List[ACLRule] = [] + self._acl: List[Union[ACLRule, None]] = [None] * (self.max_acl_rules - 1) # Implicit rule self.acl_implicit_rule = None if self.apply_implicit_rule: @@ -80,8 +80,11 @@ class AccessControlList: Indicates block if all conditions are satisfied. """ for rule in self.acl: + print("loops through rule", rule, isinstance(rule, ACLRule)) if isinstance(rule, ACLRule): + print("finds rule") if self.check_address_match(rule, _source_ip_address, _dest_ip_address): + print("source and dest ip match") if (rule.get_protocol() == _protocol or rule.get_protocol() == "ANY") and ( str(rule.get_port()) == str(_port) or rule.get_port() == "ANY" ): @@ -94,7 +97,7 @@ class AccessControlList: # If there has been no rule to allow the IER through, it will return a blocked signal by default return True - def add_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port, _position=None): + def add_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port, _position): """ Adds a new rule. @@ -106,29 +109,26 @@ class AccessControlList: _port: the port _position: position to insert ACL rule into ACL list (starting from index 1 and NOT 0) """ - position_index = int(_position) + try: + position_index = int(_position) + except TypeError: + _LOGGER.info(f"Position {_position} could not be converted to integer.") + return + new_rule = ACLRule(_permission, _source_ip, _dest_ip, _protocol, str(_port)) - if len(self._acl) + 1 < self.max_acl_rules: - if _position is not None: - if self.max_acl_rules - 1 > position_index > -1: - try: - # self._acl.insert(position_index, new_rule) - if self._acl[position_index] is None: - self.acl[position_index] = new_rule - except Exception: - _LOGGER.info(f"New Rule could NOT be added to list at position {position_index}.") + if self.max_acl_rules - 1 > position_index > -1: + try: + _LOGGER.info(f"Position {position_index} is valid.") + if self._acl[position_index] is None: + _LOGGER.info(f"Inserting rule {new_rule} at position {position_index}") + self._acl[position_index] = new_rule else: - _LOGGER.info( - f"Position {position_index} is an invalid index for list/overwrites implicit firewall rule" - ) - else: - self.acl.append(new_rule) + _LOGGER.info(f"Error: inserting rule at non-empty position {position_index}") + return + except Exception: + _LOGGER.info(f"New Rule could NOT be added to list at position {position_index}.") else: - _LOGGER.info( - f"The ACL list is FULL." - f"The list of ACLs has length {len(self.acl)} and it has a max capacity of {self.max_acl_rules}." - ) - # print("length of this list", len(self._acl)) + _LOGGER.info(f"Position {position_index} is an invalid/overwrites implicit firewall rule") def remove_rule(self, _permission, _source_ip, _dest_ip, _protocol, _port): """ diff --git a/tests/test_acl.py b/tests/test_acl.py index 3c35acbd..0d00a778 100644 --- a/tests/test_acl.py +++ b/tests/test_acl.py @@ -62,6 +62,7 @@ def test_check_acl_block_affirmative(): acl_rule_port, acl_position_in_list, ) + print(len(acl.acl), "len of acl list\n", acl.acl[0]) assert acl.is_blocked("192.168.1.1", "192.168.1.2", "TCP", "80") == False