#1355 - Carried out full renaming in node.py, active_node.py, passive_node.py, and service_node.py to make params and variable names explicit.

- Made the same renaming in the yaml laydown config files.
- Added Type hints wherever I've been.
- Added a custom NodeType in custom_typing.py to encompass the Union of ActiveNode, PassiveNode, ServiceNode.
This commit is contained in:
Chris McCarthy
2023-05-25 21:03:11 +01:00
parent 00e9d1f88d
commit 32a4d9e459
31 changed files with 965 additions and 1081 deletions

View File

@@ -3,7 +3,14 @@
import logging
from typing import Final
from primaite.common.enums import FILE_SYSTEM_STATE, HARDWARE_STATE, SOFTWARE_STATE
from primaite.common.config_values_main import ConfigValuesMain
from primaite.common.enums import (
FileSystemState,
HardwareState,
NodeType,
Priority,
SoftwareState,
)
from primaite.nodes.node import Node
_LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
@@ -14,216 +21,174 @@ class ActiveNode(Node):
def __init__(
self,
_id,
_name,
_type,
_priority,
_state,
_ip_address,
_os_state,
_file_system_state,
_config_values,
node_id: str,
name: str,
node_type: NodeType,
priority: Priority,
hardware_state: HardwareState,
ip_address: str,
software_state: SoftwareState,
file_system_state: FileSystemState,
config_values: ConfigValuesMain,
):
"""
Init.
Args:
_id: The node ID
_name: The node name
_type: The node type (enum)
_priority: The node priority (enum)
_state: The node state (enum)
_ip_address: The node IP address
_os_state: The node Operating System state
_file_system_state: The node file system state
_config_values: The config values
:param node_id: The node ID
:param name: The node name
:param node_type: The node type (enum)
:param priority: The node priority (enum)
:param hardware_state: The node Hardware State
:param ip_address: The node IP address
:param software_state: The node Software State
:param file_system_state: The node file system state
:param config_values: The config values
"""
super().__init__(_id, _name, _type, _priority, _state, _config_values)
self.ip_address = _ip_address
# Related to O/S
self.os_state = _os_state
self.patching_count = 0
super().__init__(
node_id, name, node_type, priority, hardware_state, config_values
)
self.ip_address: str = ip_address
# Related to Software
self._software_state: SoftwareState = software_state
self.patching_count: int = 0
# Related to File System
self.file_system_state_actual = _file_system_state
self.file_system_state_observed = _file_system_state
self.file_system_scanning = False
self.file_system_scanning_count = 0
self.file_system_action_count = 0
self.file_system_state_actual: FileSystemState = file_system_state
self.file_system_state_observed: FileSystemState = file_system_state
self.file_system_scanning: bool = False
self.file_system_scanning_count: int = 0
self.file_system_action_count: int = 0
def set_ip_address(self, _ip_address):
@property
def software_state(self) -> SoftwareState:
"""
Sets IP address.
Get the software_state.
Args:
_ip_address: IP address
:return: The software_state.
"""
self.ip_address = _ip_address
return self._software_state
def get_ip_address(self):
@software_state.setter
def software_state(self, software_state: SoftwareState):
"""
Gets IP address.
Get the software_state.
Returns:
IP address
:param software_state: Software State.
"""
return self.ip_address
def set_os_state(self, _os_state):
"""
Sets operating system state.
Args:
_os_state: Operating system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
self.os_state = _os_state
if _os_state == SOFTWARE_STATE.PATCHING:
if self.hardware_state != HardwareState.OFF:
self._software_state = software_state
if software_state == SoftwareState.PATCHING:
self.patching_count = self.config_values.os_patching_duration
else:
_LOGGER.info(
f"The Nodes operating state is OFF so OS State cannot be "
f"The Nodes hardware state is OFF so OS State cannot be "
f"changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node Operating System State:{self.os_state}"
f"Node.node_id:{self.node_id}, "
f"Node.hardware_state:{self.hardware_state}, "
f"Node.software_state:{self._software_state}"
)
def set_os_state_if_not_compromised(self, _os_state):
def set_software_state_if_not_compromised(self, software_state: SoftwareState):
"""
Sets operating system state if the node is not compromised.
Sets Software State if the node is not compromised.
Args:
_os_state: Operating system state
software_state: Software State
"""
if self.operating_state != HARDWARE_STATE.OFF:
if self.os_state != SOFTWARE_STATE.COMPROMISED:
self.os_state = _os_state
if _os_state == SOFTWARE_STATE.PATCHING:
if self.hardware_state != HardwareState.OFF:
if self._software_state != SoftwareState.COMPROMISED:
self._software_state = software_state
if software_state == SoftwareState.PATCHING:
self.patching_count = self.config_values.os_patching_duration
else:
_LOGGER.info(
f"The Nodes operating state is OFF so OS State cannot be changed."
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node Operating System State:{self.os_state}",
f"The Nodes hardware state is OFF so OS State cannot be changed."
f"Node.node_id:{self.node_id}, "
f"Node.hardware_state:{self.hardware_state}, "
f"Node.software_state:{self._software_state}"
)
def get_os_state(self):
"""
Gets operating system state.
Returns:
Operating system state
"""
return self.os_state
def update_os_patching_status(self):
"""Updates operating system status based on patching cycle."""
self.patching_count -= 1
if self.patching_count <= 0:
self.patching_count = 0
self.os_state = SOFTWARE_STATE.GOOD
self._software_state = SoftwareState.GOOD
def set_file_system_state(self, _file_system_state):
def set_file_system_state(self, file_system_state: FileSystemState):
"""
Sets the file system state (actual and observed).
Args:
_file_system_state: File system state
file_system_state: File system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
self.file_system_state_actual = _file_system_state
if self.hardware_state != HardwareState.OFF:
self.file_system_state_actual = file_system_state
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
if file_system_state == FileSystemState.REPAIRING:
self.file_system_action_count = (
self.config_values.file_system_repairing_limit
)
self.file_system_state_observed = FILE_SYSTEM_STATE.REPAIRING
elif _file_system_state == FILE_SYSTEM_STATE.RESTORING:
self.file_system_state_observed = FileSystemState.REPAIRING
elif file_system_state == FileSystemState.RESTORING:
self.file_system_action_count = (
self.config_values.file_system_restoring_limit
)
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
self.file_system_state_observed = FileSystemState.RESTORING
elif file_system_state == FileSystemState.GOOD:
self.file_system_state_observed = FileSystemState.GOOD
else:
_LOGGER.info(
f"The Nodes operating state is OFF so File System State "
f"The Nodes hardware state is OFF so File System State "
f"cannot be changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node File System State:{self.file_system_state_actual}",
f"Node.node_id:{self.node_id}, "
f"Node.hardware_state:{self.hardware_state}, "
f"Node.file_system_state.actual:{self.file_system_state_actual}"
)
def set_file_system_state_if_not_compromised(self, _file_system_state):
def set_file_system_state_if_not_compromised(
self, file_system_state: FileSystemState
):
"""
Sets the file system state (actual and observed) if not in a compromised state.
Use for green PoL to prevent it overturning a compromised state
Args:
_file_system_state: File system state
file_system_state: File system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
if self.hardware_state != HardwareState.OFF:
if (
self.file_system_state_actual != FILE_SYSTEM_STATE.CORRUPT
and self.file_system_state_actual != FILE_SYSTEM_STATE.DESTROYED
self.file_system_state_actual != FileSystemState.CORRUPT
and self.file_system_state_actual != FileSystemState.DESTROYED
):
self.file_system_state_actual = _file_system_state
self.file_system_state_actual = file_system_state
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
if file_system_state == FileSystemState.REPAIRING:
self.file_system_action_count = (
self.config_values.file_system_repairing_limit
)
self.file_system_state_observed = FILE_SYSTEM_STATE.REPAIRING
elif _file_system_state == FILE_SYSTEM_STATE.RESTORING:
self.file_system_state_observed = FileSystemState.REPAIRING
elif file_system_state == FileSystemState.RESTORING:
self.file_system_action_count = (
self.config_values.file_system_restoring_limit
)
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
self.file_system_state_observed = FileSystemState.RESTORING
elif file_system_state == FileSystemState.GOOD:
self.file_system_state_observed = FileSystemState.GOOD
else:
_LOGGER.info(
f"The Nodes operating state is OFF so File System State (if not "
f"The Nodes hardware state is OFF so File System State (if not "
f"compromised) cannot be changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node File System State:{self.file_system_state_actual}",
f"Node.node_id:{self.node_id}, "
f"Node.hardware_state:{self.hardware_state}, "
f"Node.file_system_state.actual:{self.file_system_state_actual}"
)
def get_file_system_state_actual(self):
"""
Gets file system state (actual).
Returns:
File system state (actual)
"""
return self.file_system_state_actual
def get_file_system_state_observed(self):
"""
Gets file system state (observed).
Returns:
File system state (observed)
"""
return self.file_system_state_observed
def start_file_system_scan(self):
"""Starts a file system scan."""
self.file_system_scanning = True
self.file_system_scanning_count = self.config_values.file_system_scanning_limit
def is_scanning_file_system(self):
"""
Gets true/false on whether file system is being scanned.
Returns:
True if file system is being scanned
"""
return self.file_system_scanning
def update_file_system_state(self):
"""Updates file system status based on scanning/restore/repair cycle."""
# Deprecate both the action count (for restoring or reparing) and the scanning count
@@ -234,11 +199,11 @@ class ActiveNode(Node):
if self.file_system_action_count <= 0:
self.file_system_action_count = 0
if (
self.file_system_state_actual == FILE_SYSTEM_STATE.REPAIRING
or self.file_system_state_actual == FILE_SYSTEM_STATE.RESTORING
self.file_system_state_actual == FileSystemState.REPAIRING
or self.file_system_state_actual == FileSystemState.RESTORING
):
self.file_system_state_actual = FILE_SYSTEM_STATE.GOOD
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
self.file_system_state_actual = FileSystemState.GOOD
self.file_system_state_observed = FileSystemState.GOOD
# Scanning updates
if self.file_system_scanning == True and self.file_system_scanning_count < 0: