Removed duplicated function definitions

This commit is contained in:
Marek Wolan
2023-07-06 10:23:14 +01:00
parent ead02ed691
commit b9549497d2

View File

@@ -2,6 +2,7 @@ from typing import Dict, List, Union
import numpy as np
from primaite.common.custom_typing import NodeUnion
from primaite.common.enums import (
HardwareState,
LinkStatus,
@@ -310,7 +311,7 @@ def describe_obs_change(
return change_string
def _describe_obs_change_helper(obs_change, is_link):
def _describe_obs_change_helper(obs_change: List[int], is_link: bool) -> str:
"""
Helper funcion to describe what has changed.
@@ -319,8 +320,14 @@ def _describe_obs_change_helper(obs_change, is_link):
Handles multiple changes e.g. 'ID 1: SERVICE 1 changed to PATCHING. SERVICE 2 set to GOOD.'
TODO: Add params and return in docstring.
TODO: Typehint params and return.
:param obs_change: List of integers generated within the `describe_obs_change` function. It should correspond to one
row of the observation table, and have `-1` at locations where the observation hasn't changed, and the new
status where it has changed.
:type obs_change: List[int]
:param is_link: Whether the row of the observation space corresponds to a link. False means it represents a node.
:type is_link: bool
:return: A human-readable description of the difference between the two observation rows.
:rtype: str
"""
# Indexes where a change has occured, not including 0th index
index_changed = [i for i in range(1, len(obs_change)) if obs_change[i] != -1]
@@ -378,65 +385,14 @@ def transform_action_node_enum(action: List[Union[str, int]]) -> List[int]:
return new_action
def transform_action_node_readable(action: List[int]) -> List[Union[int, str]]:
"""Convert a node action from enumerated format to readable format.
example:
[1, 3, 1, 0] -> [1, 'SERVICE', 'PATCHING', 0]
:param action: Raw action with integer encodings
:type action: List[int]
:return: Human-redable version of the action
:rtype: List[Union[int,str]]
"""
action_node_property = NodePOLType(action[1]).name
if action_node_property == "OPERATING":
property_action = NodeHardwareAction(action[2]).name
elif (action_node_property == "OS" or action_node_property == "SERVICE") and action[2] <= 1:
property_action = NodeSoftwareAction(action[2]).name
else:
property_action = "NONE"
new_action = [action[0], action_node_property, property_action, action[3]]
return new_action
# unused
# def node_action_description(action):
# """
# Generate string describing a node-based action.
# TO#DO: Add params and return in docstring.
# TO#DO: Typehint params and return.
# """
# if isinstance(action[1], (int, np.int64)):
# # transform action to readable format
# action = transform_action_node_readable(action)
# node_id = action[0]
# node_property = action[1]
# property_action = action[2]
# service_id = action[3]
# if property_action == "NONE":
# return ""
# if node_property == "OPERATING" or node_property == "OS":
# description = f"NODE {node_id}, {node_property}, SET TO {property_action}"
# elif node_property == "SERVICE":
# description = f"NODE {node_id} FROM SERVICE {service_id}, SET TO {property_action}"
# else:
# return ""
# return description
def transform_action_acl_enum(action):
def transform_action_acl_enum(action: List[Union[int, str]]) -> np.ndarray:
"""
Convert acl action from readable str format, to enumerated format.
TODO: Add params and return in docstring.
TODO: Typehint params and return.
:param action: ACL-based action expressed as a list of human-readable ints and strings
:type action: List[Union[int,str]]
:return: The same action but encoded to contain only integers.
:rtype: np.ndarray
"""
action_decisions = {"NONE": 0, "CREATE": 1, "DELETE": 2}
action_permissions = {"DENY": 0, "ALLOW": 1}
@@ -454,36 +410,17 @@ def transform_action_acl_enum(action):
return new_action
# unused
# def acl_action_description(action):
# """
# Generate string describing an acl-based action.
# TODO: Add params and return in docstring.
# TODO: Typehint params and return.
# """
# if isinstance(action[0], (int, np.int64)):
# # transform action to readable format
# action = transform_action_acl_readable(action)
# if action[0] == "NONE":
# description = "NO ACL RULE APPLIED"
# else:
# description = (
# f"{action[0]} RULE: {action[1]} traffic from IP {action[2]} to IP {action[3]},"
# f" for protocol/service index {action[4]} on port index {action[5]}"
# )
# return description
def get_node_of_ip(ip, node_dict):
"""
Get the node ID of an IP address.
def get_node_of_ip(ip: str, node_dict: Dict[str, NodeUnion]) -> str:
"""Get the node ID of an IP address.
node_dict: dictionary of nodes where key is ID, and value is the node (can be ontained from env.nodes)
TODO: Add params and return in docstring.
TODO: Typehint params and return.
:param ip: The IP address of the node whose ID is required
:type ip: str
:param node_dict: The environment's node registry dictionary
:type node_dict: Dict[str,NodeUnion]
:return: The key from the registry dict that corresponds to the node with the IP adress provided by `ip`
:rtype: str
"""
for node_key, node_value in node_dict.items():
node_ip = node_value.ip_address
@@ -491,101 +428,6 @@ def get_node_of_ip(ip, node_dict):
return node_key
def is_valid_node_action(action):
"""Is the node action an actual valid action.
Only uses information about the action to determine if the action has an effect
Does NOT consider:
- Node ID not valid to perform an operation - e.g. selected node has no service so cannot patch
- Node already being in that state (turning an ON node ON)
TODO: Add params and return in docstring.
TODO: Typehint params and return.
"""
action_r = transform_action_node_readable(action)
node_property = action_r[1]
node_action = action_r[2]
if node_property == "NONE":
return False
if node_action == "NONE":
return False
if node_property == "OPERATING" and node_action == "PATCHING":
# Operating State cannot PATCH
return False
if node_property != "OPERATING" and node_action not in [
"NONE",
"PATCHING",
]:
# Software States can only do Nothing or Patch
return False
return True
def is_valid_acl_action(action: List[int]) -> bool:
"""
Is the ACL action an actual valid action.
Only uses information about the action to determine if the action has an effect
Does NOT consider:
- Trying to create identical rules
- Trying to create a rule which is a subset of another rule (caused by "ANY")
:param action: Action to check
:type action: List[int]
:return: Whether the action is valid
:rtype: bool
"""
action_r = transform_action_acl_readable(action)
action_decision = action_r[0]
action_permission = action_r[1]
action_source_id = action_r[2]
action_destination_id = action_r[3]
if action_decision == "NONE":
return False
if action_source_id == action_destination_id and action_source_id != "ANY" and action_destination_id != "ANY":
# ACL rule towards itself
return False
if action_permission == "DENY":
# DENY is unnecessary, we can create and delete allow rules instead
# No allow rule = blocked/DENY by feault. ALLOW overrides existing DENY.
return False
return True
def is_valid_acl_action_extra(action: List[int]) -> bool:
"""
Harsher version of valid acl actions, does not allow action.
:param action: Input action
:type action: List[int]
:return: Whether the action is a valid ACL action
:rtype: bool
"""
if is_valid_acl_action(action) is False:
return False
action_r = transform_action_acl_readable(action)
action_protocol = action_r[4]
action_port = action_r[5]
# Don't allow protocols or ports to be ANY
# in the future we might want to do the opposite, and only have ANY option for ports and service
if action_protocol == "ANY":
return False
if action_port == "ANY":
return False
return True
def get_new_action(old_action: np.ndarray, action_dict: Dict[int, List]) -> int:
"""
Get new action (e.g. 32) from old action e.g. [1,1,1,0].