diff --git a/src/primaite/agents/utils.py b/src/primaite/agents/utils.py index bffdbf43..58b422d0 100644 --- a/src/primaite/agents/utils.py +++ b/src/primaite/agents/utils.py @@ -1,4 +1,4 @@ -from typing import List, Union +from typing import Dict, List, Union import numpy as np @@ -346,15 +346,15 @@ def _describe_obs_change_helper(obs_change, is_link): return desc -def transform_action_node_enum(action): - """ - Convert a node action from readable string format, to enumerated format. +def transform_action_node_enum(action: List[Union[str, int]]) -> List[int]: + """Convert a node action from readable string format, to enumerated format. example: [1, 'SERVICE', 'PATCHING', 0] -> [1, 3, 1, 0] - - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param action: Action in 'readable' format + :type action: List[Union[str,int]] + :return: Action with verbs encoded as ints + :rtype: List[int] """ action_node_id = action[0] action_node_property = NodePOLType[action[1]].value @@ -378,15 +378,16 @@ def transform_action_node_enum(action): return new_action -def transform_action_node_readable(action): - """ - Convert a node action from enumerated format to readable format. +def transform_action_node_readable(action: List[int]) -> List[Union[int, str]]: + """Convert a node action from enumerated format to readable format. example: [1, 3, 1, 0] -> [1, 'SERVICE', 'PATCHING', 0] - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param action: Raw action with integer encodings + :type action: List[int] + :return: Human-redable version of the action + :rtype: List[Union[int,str]] """ action_node_property = NodePOLType(action[1]).name @@ -401,32 +402,33 @@ def transform_action_node_readable(action): return new_action -def node_action_description(action): - """ - Generate string describing a node-based action. +# unused +# def node_action_description(action): +# """ +# Generate string describing a node-based action. - TODO: Add params and return in docstring. - TODO: Typehint params and return. - """ - if isinstance(action[1], (int, np.int64)): - # transform action to readable format - action = transform_action_node_readable(action) +# TO#DO: Add params and return in docstring. +# TO#DO: Typehint params and return. +# """ +# if isinstance(action[1], (int, np.int64)): +# # transform action to readable format +# action = transform_action_node_readable(action) - node_id = action[0] - node_property = action[1] - property_action = action[2] - service_id = action[3] +# node_id = action[0] +# node_property = action[1] +# property_action = action[2] +# service_id = action[3] - if property_action == "NONE": - return "" - if node_property == "OPERATING" or node_property == "OS": - description = f"NODE {node_id}, {node_property}, SET TO {property_action}" - elif node_property == "SERVICE": - description = f"NODE {node_id} FROM SERVICE {service_id}, SET TO {property_action}" - else: - return "" +# if property_action == "NONE": +# return "" +# if node_property == "OPERATING" or node_property == "OS": +# description = f"NODE {node_id}, {node_property}, SET TO {property_action}" +# elif node_property == "SERVICE": +# description = f"NODE {node_id} FROM SERVICE {service_id}, SET TO {property_action}" +# else: +# return "" - return description +# return description def transform_action_acl_enum(action): @@ -452,25 +454,26 @@ def transform_action_acl_enum(action): return new_action -def acl_action_description(action): - """ - Generate string describing an acl-based action. +# unused +# def acl_action_description(action): +# """ +# Generate string describing an acl-based action. - TODO: Add params and return in docstring. - TODO: Typehint params and return. - """ - if isinstance(action[0], (int, np.int64)): - # transform action to readable format - action = transform_action_acl_readable(action) - if action[0] == "NONE": - description = "NO ACL RULE APPLIED" - else: - description = ( - f"{action[0]} RULE: {action[1]} traffic from IP {action[2]} to IP {action[3]}," - f" for protocol/service index {action[4]} on port index {action[5]}" - ) +# TODO: Add params and return in docstring. +# TODO: Typehint params and return. +# """ +# if isinstance(action[0], (int, np.int64)): +# # transform action to readable format +# action = transform_action_acl_readable(action) +# if action[0] == "NONE": +# description = "NO ACL RULE APPLIED" +# else: +# description = ( +# f"{action[0]} RULE: {action[1]} traffic from IP {action[2]} to IP {action[3]}," +# f" for protocol/service index {action[4]} on port index {action[5]}" +# ) - return description +# return description def get_node_of_ip(ip, node_dict): @@ -521,7 +524,7 @@ def is_valid_node_action(action): return True -def is_valid_acl_action(action): +def is_valid_acl_action(action: List[int]) -> bool: """ Is the ACL action an actual valid action. @@ -531,8 +534,11 @@ def is_valid_acl_action(action): - Trying to create identical rules - Trying to create a rule which is a subset of another rule (caused by "ANY") - TODO: Add params and return in docstring. - TODO: Typehint params and return. + + :param action: Action to check + :type action: List[int] + :return: Whether the action is valid + :rtype: bool """ action_r = transform_action_acl_readable(action) @@ -554,12 +560,14 @@ def is_valid_acl_action(action): return True -def is_valid_acl_action_extra(action): +def is_valid_acl_action_extra(action: List[int]) -> bool: """ Harsher version of valid acl actions, does not allow action. - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param action: Input action + :type action: List[int] + :return: Whether the action is a valid ACL action + :rtype: bool """ if is_valid_acl_action(action) is False: return False @@ -578,14 +586,18 @@ def is_valid_acl_action_extra(action): return True -def get_new_action(old_action, action_dict): +def get_new_action(old_action: np.ndarray, action_dict: Dict[int, List]) -> int: """ Get new action (e.g. 32) from old action e.g. [1,1,1,0]. Old_action can be either node or acl action type - TODO: Add params and return in docstring. - TODO: Typehint params and return. + :param old_action: Action expressed as a list of choices, eg. [1,1,1,0] + :type old_action: np.ndarray + :param action_dict: Dictionary for translating the multidiscrete actions into the list-based actions. + :type action_dict: Dict[int,List] + :return: Action key correspoinding to the input `old_action` + :rtype: int """ for key, val in action_dict.items(): if list(val) == list(old_action): diff --git a/src/primaite/pol/red_agent_pol.py b/src/primaite/pol/red_agent_pol.py index bff19bf8..1a8bd406 100644 --- a/src/primaite/pol/red_agent_pol.py +++ b/src/primaite/pol/red_agent_pol.py @@ -296,11 +296,17 @@ def apply_red_agent_node_pol( pass -def is_red_ier_incoming(node, iers, node_pol_type): - """ - Checks if the RED IER is incoming. +def is_red_ier_incoming(node: NodeUnion, iers: Dict[str, IER], node_pol_type: NodePOLType) -> bool: + """Checks if the RED IER is incoming. - TODO: Write more descriptive docstring with params and returns. + :param node: Destination node of the IER + :type node: NodeUnion + :param iers: Directory of IERs + :type iers: Dict[str,IER] + :param node_pol_type: Type of Pattern-Of-Life + :type node_pol_type: NodePOLType + :return: Whether the RED IER is incoming. + :rtype: bool """ node_id = node.node_id