#1356 - added if statements to set class methods for file system state, os state and service states. Refactored file enums.py
- Added unit tests
This commit is contained in:
@@ -1,8 +1,13 @@
|
|||||||
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
||||||
"""An Active Node (i.e. not an actuator)."""
|
"""An Active Node (i.e. not an actuator)."""
|
||||||
from primaite.common.enums import FILE_SYSTEM_STATE, SOFTWARE_STATE
|
import logging
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
from primaite.common.enums import FILE_SYSTEM_STATE, HARDWARE_STATE, SOFTWARE_STATE
|
||||||
from primaite.nodes.node import Node
|
from primaite.nodes.node import Node
|
||||||
|
|
||||||
|
_LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ActiveNode(Node):
|
class ActiveNode(Node):
|
||||||
"""Active Node class."""
|
"""Active Node class."""
|
||||||
@@ -70,9 +75,18 @@ class ActiveNode(Node):
|
|||||||
Args:
|
Args:
|
||||||
_os_state: Operating system state
|
_os_state: Operating system state
|
||||||
"""
|
"""
|
||||||
self.os_state = _os_state
|
if self.operating_state != HARDWARE_STATE.OFF:
|
||||||
if _os_state == SOFTWARE_STATE.PATCHING:
|
self.os_state = _os_state
|
||||||
self.patching_count = self.config_values.os_patching_duration
|
if _os_state == SOFTWARE_STATE.PATCHING:
|
||||||
|
self.patching_count = self.config_values.os_patching_duration
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
f"The Nodes operating state is OFF so OS State cannot be "
|
||||||
|
f"changed. "
|
||||||
|
f"Node:{self.id}, "
|
||||||
|
f"Node Operating State:{self.operating_state}, "
|
||||||
|
f"Node Operating System State:{self.os_state}"
|
||||||
|
)
|
||||||
|
|
||||||
def set_os_state_if_not_compromised(self, _os_state):
|
def set_os_state_if_not_compromised(self, _os_state):
|
||||||
"""
|
"""
|
||||||
@@ -81,10 +95,18 @@ class ActiveNode(Node):
|
|||||||
Args:
|
Args:
|
||||||
_os_state: Operating system state
|
_os_state: Operating system state
|
||||||
"""
|
"""
|
||||||
if self.os_state != SOFTWARE_STATE.COMPROMISED:
|
if self.operating_state != HARDWARE_STATE.OFF:
|
||||||
self.os_state = _os_state
|
if self.os_state != SOFTWARE_STATE.COMPROMISED:
|
||||||
if _os_state == SOFTWARE_STATE.PATCHING:
|
self.os_state = _os_state
|
||||||
self.patching_count = self.config_values.os_patching_duration
|
if _os_state == SOFTWARE_STATE.PATCHING:
|
||||||
|
self.patching_count = self.config_values.os_patching_duration
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
f"The Nodes operating state is OFF so OS State cannot be changed."
|
||||||
|
f"Node:{self.id}, "
|
||||||
|
f"Node Operating State:{self.operating_state}, "
|
||||||
|
f"Node Operating System State:{self.os_state}",
|
||||||
|
)
|
||||||
|
|
||||||
def get_os_state(self):
|
def get_os_state(self):
|
||||||
"""
|
"""
|
||||||
@@ -109,34 +131,7 @@ class ActiveNode(Node):
|
|||||||
Args:
|
Args:
|
||||||
_file_system_state: File system state
|
_file_system_state: File system state
|
||||||
"""
|
"""
|
||||||
self.file_system_state_actual = _file_system_state
|
if self.operating_state != HARDWARE_STATE.OFF:
|
||||||
|
|
||||||
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
|
|
||||||
self.file_system_action_count = (
|
|
||||||
self.config_values.file_system_repairing_limit
|
|
||||||
)
|
|
||||||
self.file_system_state_observed = FILE_SYSTEM_STATE.REPAIRING
|
|
||||||
elif _file_system_state == FILE_SYSTEM_STATE.RESTORING:
|
|
||||||
self.file_system_action_count = (
|
|
||||||
self.config_values.file_system_restoring_limit
|
|
||||||
)
|
|
||||||
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
|
|
||||||
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
|
|
||||||
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
|
|
||||||
|
|
||||||
def set_file_system_state_if_not_compromised(self, _file_system_state):
|
|
||||||
"""
|
|
||||||
Sets the file system state (actual and observed) if not in a compromised state.
|
|
||||||
|
|
||||||
Use for green PoL to prevent it overturning a compromised state
|
|
||||||
|
|
||||||
Args:
|
|
||||||
_file_system_state: File system state
|
|
||||||
"""
|
|
||||||
if (
|
|
||||||
self.file_system_state_actual != FILE_SYSTEM_STATE.CORRUPT
|
|
||||||
and self.file_system_state_actual != FILE_SYSTEM_STATE.DESTROYED
|
|
||||||
):
|
|
||||||
self.file_system_state_actual = _file_system_state
|
self.file_system_state_actual = _file_system_state
|
||||||
|
|
||||||
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
|
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
|
||||||
@@ -151,6 +146,51 @@ class ActiveNode(Node):
|
|||||||
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
|
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
|
||||||
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
|
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
|
||||||
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
|
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
f"The Nodes operating state is OFF so File System State "
|
||||||
|
f"cannot be changed. "
|
||||||
|
f"Node:{self.id}, "
|
||||||
|
f"Node Operating State:{self.operating_state}, "
|
||||||
|
f"Node File System State:{self.file_system_state_actual}",
|
||||||
|
)
|
||||||
|
|
||||||
|
def set_file_system_state_if_not_compromised(self, _file_system_state):
|
||||||
|
"""
|
||||||
|
Sets the file system state (actual and observed) if not in a compromised state.
|
||||||
|
|
||||||
|
Use for green PoL to prevent it overturning a compromised state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
_file_system_state: File system state
|
||||||
|
"""
|
||||||
|
if self.operating_state != HARDWARE_STATE.OFF:
|
||||||
|
if (
|
||||||
|
self.file_system_state_actual != FILE_SYSTEM_STATE.CORRUPT
|
||||||
|
and self.file_system_state_actual != FILE_SYSTEM_STATE.DESTROYED
|
||||||
|
):
|
||||||
|
self.file_system_state_actual = _file_system_state
|
||||||
|
|
||||||
|
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
|
||||||
|
self.file_system_action_count = (
|
||||||
|
self.config_values.file_system_repairing_limit
|
||||||
|
)
|
||||||
|
self.file_system_state_observed = FILE_SYSTEM_STATE.REPAIRING
|
||||||
|
elif _file_system_state == FILE_SYSTEM_STATE.RESTORING:
|
||||||
|
self.file_system_action_count = (
|
||||||
|
self.config_values.file_system_restoring_limit
|
||||||
|
)
|
||||||
|
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
|
||||||
|
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
|
||||||
|
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
f"The Nodes operating state is OFF so File System State (if not "
|
||||||
|
f"compromised) cannot be changed. "
|
||||||
|
f"Node:{self.id}, "
|
||||||
|
f"Node Operating State:{self.operating_state}, "
|
||||||
|
f"Node File System State:{self.file_system_state_actual}",
|
||||||
|
)
|
||||||
|
|
||||||
def get_file_system_state_actual(self):
|
def get_file_system_state_actual(self):
|
||||||
"""
|
"""
|
||||||
@@ -185,7 +225,7 @@ class ActiveNode(Node):
|
|||||||
return self.file_system_scanning
|
return self.file_system_scanning
|
||||||
|
|
||||||
def update_file_system_state(self):
|
def update_file_system_state(self):
|
||||||
"""Updates file system status based on scanning / restore / repair cycle."""
|
"""Updates file system status based on scanning/restore/repair cycle."""
|
||||||
# Deprecate both the action count (for restoring or reparing) and the scanning count
|
# Deprecate both the action count (for restoring or reparing) and the scanning count
|
||||||
self.file_system_action_count -= 1
|
self.file_system_action_count -= 1
|
||||||
self.file_system_scanning_count -= 1
|
self.file_system_scanning_count -= 1
|
||||||
|
|||||||
@@ -1,8 +1,13 @@
|
|||||||
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
|
||||||
"""A Service Node (i.e. not an actuator)."""
|
"""A Service Node (i.e. not an actuator)."""
|
||||||
from primaite.common.enums import SOFTWARE_STATE
|
import logging
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
from primaite.common.enums import HARDWARE_STATE, SOFTWARE_STATE
|
||||||
from primaite.nodes.active_node import ActiveNode
|
from primaite.nodes.active_node import ActiveNode
|
||||||
|
|
||||||
|
_LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ServiceNode(ActiveNode):
|
class ServiceNode(ActiveNode):
|
||||||
"""ServiceNode class."""
|
"""ServiceNode class."""
|
||||||
@@ -119,24 +124,34 @@ class ServiceNode(ActiveNode):
|
|||||||
_protocol: The service (protocol)
|
_protocol: The service (protocol)
|
||||||
_state: The state value
|
_state: The state value
|
||||||
"""
|
"""
|
||||||
for service_key, service_value in self.services.items():
|
if self.operating_state != HARDWARE_STATE.OFF:
|
||||||
if service_key == _protocol:
|
for service_key, service_value in self.services.items():
|
||||||
# Can't set to compromised if you're in a patching state
|
if service_key == _protocol:
|
||||||
if (
|
# Can't set to compromised if you're in a patching state
|
||||||
_state == SOFTWARE_STATE.COMPROMISED
|
if (
|
||||||
and service_value.get_state() != SOFTWARE_STATE.PATCHING
|
_state == SOFTWARE_STATE.COMPROMISED
|
||||||
) or _state != SOFTWARE_STATE.COMPROMISED:
|
and service_value.get_state() != SOFTWARE_STATE.PATCHING
|
||||||
service_value.set_state(_state)
|
) or _state != SOFTWARE_STATE.COMPROMISED:
|
||||||
else:
|
service_value.set_state(_state)
|
||||||
# Do nothing
|
else:
|
||||||
pass
|
# Do nothing
|
||||||
if _state == SOFTWARE_STATE.PATCHING:
|
pass
|
||||||
service_value.patching_count = (
|
if _state == SOFTWARE_STATE.PATCHING:
|
||||||
self.config_values.service_patching_duration
|
service_value.patching_count = (
|
||||||
)
|
self.config_values.service_patching_duration
|
||||||
else:
|
)
|
||||||
# Do nothing
|
else:
|
||||||
pass
|
# Do nothing
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
f"The Nodes operating state is OFF so the state of a service "
|
||||||
|
f"cannot be changed. "
|
||||||
|
f"Node:{self.id}, "
|
||||||
|
f"Node Operating State:{self.operating_state}, "
|
||||||
|
f"Node Service Protocol:{_protocol}, "
|
||||||
|
f"Node Service State: {_state}"
|
||||||
|
)
|
||||||
|
|
||||||
def set_service_state_if_not_compromised(self, _protocol, _state):
|
def set_service_state_if_not_compromised(self, _protocol, _state):
|
||||||
"""
|
"""
|
||||||
@@ -146,14 +161,24 @@ class ServiceNode(ActiveNode):
|
|||||||
_protocol: The service (protocol)
|
_protocol: The service (protocol)
|
||||||
_state: The state value
|
_state: The state value
|
||||||
"""
|
"""
|
||||||
for service_key, service_value in self.services.items():
|
if self.operating_state != HARDWARE_STATE.OFF:
|
||||||
if service_key == _protocol:
|
for service_key, service_value in self.services.items():
|
||||||
if service_value.get_state() != SOFTWARE_STATE.COMPROMISED:
|
if service_key == _protocol:
|
||||||
service_value.set_state(_state)
|
if service_value.get_state() != SOFTWARE_STATE.COMPROMISED:
|
||||||
if _state == SOFTWARE_STATE.PATCHING:
|
service_value.set_state(_state)
|
||||||
service_value.patching_count = (
|
if _state == SOFTWARE_STATE.PATCHING:
|
||||||
self.config_values.service_patching_duration
|
service_value.patching_count = (
|
||||||
)
|
self.config_values.service_patching_duration
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
f"The Nodes operating state is OFF so the state of a service "
|
||||||
|
f"cannot be changed. "
|
||||||
|
f"Node:{self.id}, "
|
||||||
|
f"Node Operating State:{self.operating_state}, "
|
||||||
|
f"Node Service Protocol:{_protocol}, "
|
||||||
|
f"Node Service State:{_state}"
|
||||||
|
)
|
||||||
|
|
||||||
def get_service_state(self, _protocol):
|
def get_service_state(self, _protocol):
|
||||||
"""
|
"""
|
||||||
|
|||||||
121
tests/test_active_node.py
Normal file
121
tests/test_active_node.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
"""Used to test Active Node functions."""
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from primaite.common.enums import FILE_SYSTEM_STATE, HARDWARE_STATE, SOFTWARE_STATE
|
||||||
|
from primaite.nodes.active_node import ActiveNode
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operating_state, expected_state",
|
||||||
|
[
|
||||||
|
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
|
||||||
|
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_os_state_change(operating_state, expected_state):
|
||||||
|
"""
|
||||||
|
Test that a node cannot change its operating system state.
|
||||||
|
|
||||||
|
When its operating state is OFF.
|
||||||
|
"""
|
||||||
|
active_node = ActiveNode(
|
||||||
|
0,
|
||||||
|
"node",
|
||||||
|
"COMPUTER",
|
||||||
|
"1",
|
||||||
|
operating_state,
|
||||||
|
"192.168.0.1",
|
||||||
|
SOFTWARE_STATE.GOOD,
|
||||||
|
"GOOD",
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
active_node.set_os_state(SOFTWARE_STATE.OVERWHELMED)
|
||||||
|
|
||||||
|
assert active_node.get_os_state() == expected_state
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operating_state, expected_state",
|
||||||
|
[
|
||||||
|
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
|
||||||
|
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_os_state_change_if_not_compromised(operating_state, expected_state):
|
||||||
|
"""
|
||||||
|
Test that a node cannot change its operating system state.
|
||||||
|
|
||||||
|
If not compromised) when its operating state is OFF.
|
||||||
|
"""
|
||||||
|
active_node = ActiveNode(
|
||||||
|
0,
|
||||||
|
"node",
|
||||||
|
"COMPUTER",
|
||||||
|
"1",
|
||||||
|
operating_state,
|
||||||
|
"192.168.0.1",
|
||||||
|
SOFTWARE_STATE.GOOD,
|
||||||
|
"GOOD",
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
active_node.set_os_state_if_not_compromised(SOFTWARE_STATE.OVERWHELMED)
|
||||||
|
|
||||||
|
assert active_node.get_os_state() == expected_state
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operating_state, expected_state",
|
||||||
|
[
|
||||||
|
(HARDWARE_STATE.OFF, FILE_SYSTEM_STATE.GOOD),
|
||||||
|
(HARDWARE_STATE.ON, FILE_SYSTEM_STATE.CORRUPT),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_file_system_change(operating_state, expected_state):
|
||||||
|
"""Test that a node cannot change its file system state when its operating state is ON."""
|
||||||
|
active_node = ActiveNode(
|
||||||
|
0,
|
||||||
|
"node",
|
||||||
|
"COMPUTER",
|
||||||
|
"1",
|
||||||
|
operating_state,
|
||||||
|
"192.168.0.1",
|
||||||
|
"COMPROMISED",
|
||||||
|
FILE_SYSTEM_STATE.GOOD,
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
active_node.set_file_system_state(FILE_SYSTEM_STATE.CORRUPT)
|
||||||
|
|
||||||
|
assert active_node.get_file_system_state_actual() == expected_state
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operating_state, expected_state",
|
||||||
|
[
|
||||||
|
(HARDWARE_STATE.OFF, FILE_SYSTEM_STATE.GOOD),
|
||||||
|
(HARDWARE_STATE.ON, FILE_SYSTEM_STATE.CORRUPT),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_file_system_change_if_not_compromised(operating_state, expected_state):
|
||||||
|
"""
|
||||||
|
Test that a node cannot change its file system state.
|
||||||
|
|
||||||
|
If not compromised) when its operating state is OFF.
|
||||||
|
"""
|
||||||
|
active_node = ActiveNode(
|
||||||
|
0,
|
||||||
|
"node",
|
||||||
|
"COMPUTER",
|
||||||
|
"1",
|
||||||
|
operating_state,
|
||||||
|
"192.168.0.1",
|
||||||
|
"GOOD",
|
||||||
|
FILE_SYSTEM_STATE.GOOD,
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
active_node.set_file_system_state_if_not_compromised(FILE_SYSTEM_STATE.CORRUPT)
|
||||||
|
|
||||||
|
assert active_node.get_file_system_state_actual() == expected_state
|
||||||
70
tests/test_service_node.py
Normal file
70
tests/test_service_node.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
"""Used to test Service Node functions."""
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from primaite.common.enums import HARDWARE_STATE, SOFTWARE_STATE
|
||||||
|
from primaite.common.service import Service
|
||||||
|
from primaite.nodes.service_node import ServiceNode
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operating_state, expected_state",
|
||||||
|
[
|
||||||
|
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
|
||||||
|
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_service_state_change(operating_state, expected_state):
|
||||||
|
"""
|
||||||
|
Test that a node cannot change the state of a running service.
|
||||||
|
|
||||||
|
When its operating state is OFF.
|
||||||
|
"""
|
||||||
|
service_node = ServiceNode(
|
||||||
|
0,
|
||||||
|
"node",
|
||||||
|
"COMPUTER",
|
||||||
|
"1",
|
||||||
|
operating_state,
|
||||||
|
"192.168.0.1",
|
||||||
|
"COMPROMISED",
|
||||||
|
"RESTORING",
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
service = Service("TCP", 80, SOFTWARE_STATE.GOOD)
|
||||||
|
service_node.add_service(service)
|
||||||
|
|
||||||
|
service_node.set_service_state("TCP", SOFTWARE_STATE.OVERWHELMED)
|
||||||
|
|
||||||
|
assert service_node.get_service_state("TCP") == expected_state
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"operating_state, expected_state",
|
||||||
|
[
|
||||||
|
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
|
||||||
|
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_service_state_change_if_not_comprised(operating_state, expected_state):
|
||||||
|
"""
|
||||||
|
Test that a node cannot change the state of a running service.
|
||||||
|
|
||||||
|
If not compromised when its operating state is ON.
|
||||||
|
"""
|
||||||
|
service_node = ServiceNode(
|
||||||
|
0,
|
||||||
|
"node",
|
||||||
|
"COMPUTER",
|
||||||
|
"1",
|
||||||
|
operating_state,
|
||||||
|
"192.168.0.1",
|
||||||
|
"GOOD",
|
||||||
|
"RESTORING",
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
service = Service("TCP", 80, SOFTWARE_STATE.GOOD)
|
||||||
|
service_node.add_service(service)
|
||||||
|
|
||||||
|
service_node.set_service_state_if_not_compromised("TCP", SOFTWARE_STATE.OVERWHELMED)
|
||||||
|
|
||||||
|
assert service_node.get_service_state("TCP") == expected_state
|
||||||
Reference in New Issue
Block a user