From c38dda34b9b3f4917c921876d9b2fc7f71939220 Mon Sep 17 00:00:00 2001 From: Marek Wolan Date: Thu, 6 Jul 2023 10:23:14 +0100 Subject: [PATCH] Removed duplicated function definitions --- src/primaite/agents/utils.py | 204 ++++------------------------------- 1 file changed, 23 insertions(+), 181 deletions(-) diff --git a/src/primaite/agents/utils.py b/src/primaite/agents/utils.py index 58b422d0..5f5261e0 100644 --- a/src/primaite/agents/utils.py +++ b/src/primaite/agents/utils.py @@ -2,6 +2,7 @@ from typing import Dict, List, Union import numpy as np +from primaite.common.custom_typing import NodeUnion from primaite.common.enums import ( HardwareState, LinkStatus, @@ -310,7 +311,7 @@ def describe_obs_change( return change_string -def _describe_obs_change_helper(obs_change, is_link): +def _describe_obs_change_helper(obs_change: List[int], is_link: bool) -> str: """ Helper funcion to describe what has changed. @@ -319,8 +320,14 @@ def _describe_obs_change_helper(obs_change, is_link): Handles multiple changes e.g. 'ID 1: SERVICE 1 changed to PATCHING. SERVICE 2 set to GOOD.' - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param obs_change: List of integers generated within the `describe_obs_change` function. It should correspond to one + row of the observation table, and have `-1` at locations where the observation hasn't changed, and the new + status where it has changed. + :type obs_change: List[int] + :param is_link: Whether the row of the observation space corresponds to a link. False means it represents a node. + :type is_link: bool + :return: A human-readable description of the difference between the two observation rows. + :rtype: str """ # Indexes where a change has occured, not including 0th index index_changed = [i for i in range(1, len(obs_change)) if obs_change[i] != -1] @@ -378,65 +385,14 @@ def transform_action_node_enum(action: List[Union[str, int]]) -> List[int]: return new_action -def transform_action_node_readable(action: List[int]) -> List[Union[int, str]]: - """Convert a node action from enumerated format to readable format. - - example: - [1, 3, 1, 0] -> [1, 'SERVICE', 'PATCHING', 0] - - :param action: Raw action with integer encodings - :type action: List[int] - :return: Human-redable version of the action - :rtype: List[Union[int,str]] - """ - action_node_property = NodePOLType(action[1]).name - - if action_node_property == "OPERATING": - property_action = NodeHardwareAction(action[2]).name - elif (action_node_property == "OS" or action_node_property == "SERVICE") and action[2] <= 1: - property_action = NodeSoftwareAction(action[2]).name - else: - property_action = "NONE" - - new_action = [action[0], action_node_property, property_action, action[3]] - return new_action - - -# unused -# def node_action_description(action): -# """ -# Generate string describing a node-based action. - -# TO#DO: Add params and return in docstring. -# TO#DO: Typehint params and return. -# """ -# if isinstance(action[1], (int, np.int64)): -# # transform action to readable format -# action = transform_action_node_readable(action) - -# node_id = action[0] -# node_property = action[1] -# property_action = action[2] -# service_id = action[3] - -# if property_action == "NONE": -# return "" -# if node_property == "OPERATING" or node_property == "OS": -# description = f"NODE {node_id}, {node_property}, SET TO {property_action}" -# elif node_property == "SERVICE": -# description = f"NODE {node_id} FROM SERVICE {service_id}, SET TO {property_action}" -# else: -# return "" - -# return description - - -def transform_action_acl_enum(action): +def transform_action_acl_enum(action: List[Union[int, str]]) -> np.ndarray: """ Convert acl action from readable str format, to enumerated format. - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param action: ACL-based action expressed as a list of human-readable ints and strings + :type action: List[Union[int,str]] + :return: The same action but encoded to contain only integers. + :rtype: np.ndarray """ action_decisions = {"NONE": 0, "CREATE": 1, "DELETE": 2} action_permissions = {"DENY": 0, "ALLOW": 1} @@ -454,36 +410,17 @@ def transform_action_acl_enum(action): return new_action -# unused -# def acl_action_description(action): -# """ -# Generate string describing an acl-based action. - -# TODO: Add params and return in docstring. -# TODO: Typehint params and return. -# """ -# if isinstance(action[0], (int, np.int64)): -# # transform action to readable format -# action = transform_action_acl_readable(action) -# if action[0] == "NONE": -# description = "NO ACL RULE APPLIED" -# else: -# description = ( -# f"{action[0]} RULE: {action[1]} traffic from IP {action[2]} to IP {action[3]}," -# f" for protocol/service index {action[4]} on port index {action[5]}" -# ) - -# return description - - -def get_node_of_ip(ip, node_dict): - """ - Get the node ID of an IP address. +def get_node_of_ip(ip: str, node_dict: Dict[str, NodeUnion]) -> str: + """Get the node ID of an IP address. node_dict: dictionary of nodes where key is ID, and value is the node (can be ontained from env.nodes) - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param ip: The IP address of the node whose ID is required + :type ip: str + :param node_dict: The environment's node registry dictionary + :type node_dict: Dict[str,NodeUnion] + :return: The key from the registry dict that corresponds to the node with the IP adress provided by `ip` + :rtype: str """ for node_key, node_value in node_dict.items(): node_ip = node_value.ip_address @@ -491,101 +428,6 @@ def get_node_of_ip(ip, node_dict): return node_key -def is_valid_node_action(action): - """Is the node action an actual valid action. - - Only uses information about the action to determine if the action has an effect - - Does NOT consider: - - Node ID not valid to perform an operation - e.g. selected node has no service so cannot patch - - Node already being in that state (turning an ON node ON) - - TODO: Add params and return in docstring. - TODO: Typehint params and return. - """ - action_r = transform_action_node_readable(action) - - node_property = action_r[1] - node_action = action_r[2] - - if node_property == "NONE": - return False - if node_action == "NONE": - return False - if node_property == "OPERATING" and node_action == "PATCHING": - # Operating State cannot PATCH - return False - if node_property != "OPERATING" and node_action not in [ - "NONE", - "PATCHING", - ]: - # Software States can only do Nothing or Patch - return False - return True - - -def is_valid_acl_action(action: List[int]) -> bool: - """ - Is the ACL action an actual valid action. - - Only uses information about the action to determine if the action has an effect - - Does NOT consider: - - Trying to create identical rules - - Trying to create a rule which is a subset of another rule (caused by "ANY") - - - :param action: Action to check - :type action: List[int] - :return: Whether the action is valid - :rtype: bool - """ - action_r = transform_action_acl_readable(action) - - action_decision = action_r[0] - action_permission = action_r[1] - action_source_id = action_r[2] - action_destination_id = action_r[3] - - if action_decision == "NONE": - return False - if action_source_id == action_destination_id and action_source_id != "ANY" and action_destination_id != "ANY": - # ACL rule towards itself - return False - if action_permission == "DENY": - # DENY is unnecessary, we can create and delete allow rules instead - # No allow rule = blocked/DENY by feault. ALLOW overrides existing DENY. - return False - - return True - - -def is_valid_acl_action_extra(action: List[int]) -> bool: - """ - Harsher version of valid acl actions, does not allow action. - - :param action: Input action - :type action: List[int] - :return: Whether the action is a valid ACL action - :rtype: bool - """ - if is_valid_acl_action(action) is False: - return False - - action_r = transform_action_acl_readable(action) - action_protocol = action_r[4] - action_port = action_r[5] - - # Don't allow protocols or ports to be ANY - # in the future we might want to do the opposite, and only have ANY option for ports and service - if action_protocol == "ANY": - return False - if action_port == "ANY": - return False - - return True - - def get_new_action(old_action: np.ndarray, action_dict: Dict[int, List]) -> int: """ Get new action (e.g. 32) from old action e.g. [1,1,1,0].