Merged PR 54: #1356 - added if statements to set class methods for file system state, os st...

#1356 - added if statements to set class methods for file system state, os state and service states. Refactored file enums.py
- Added unit tests

Related work items: #1356
This commit is contained in:
Christopher McCarthy
2023-05-25 12:33:16 +00:00
4 changed files with 320 additions and 64 deletions

View File

@@ -1,8 +1,13 @@
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
"""An Active Node (i.e. not an actuator)."""
from primaite.common.enums import FILE_SYSTEM_STATE, SOFTWARE_STATE
import logging
from typing import Final
from primaite.common.enums import FILE_SYSTEM_STATE, HARDWARE_STATE, SOFTWARE_STATE
from primaite.nodes.node import Node
_LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
class ActiveNode(Node):
"""Active Node class."""
@@ -70,9 +75,18 @@ class ActiveNode(Node):
Args:
_os_state: Operating system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
self.os_state = _os_state
if _os_state == SOFTWARE_STATE.PATCHING:
self.patching_count = self.config_values.os_patching_duration
else:
_LOGGER.info(
f"The Nodes operating state is OFF so OS State cannot be "
f"changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node Operating System State:{self.os_state}"
)
def set_os_state_if_not_compromised(self, _os_state):
"""
@@ -81,10 +95,18 @@ class ActiveNode(Node):
Args:
_os_state: Operating system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
if self.os_state != SOFTWARE_STATE.COMPROMISED:
self.os_state = _os_state
if _os_state == SOFTWARE_STATE.PATCHING:
self.patching_count = self.config_values.os_patching_duration
else:
_LOGGER.info(
f"The Nodes operating state is OFF so OS State cannot be changed."
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node Operating System State:{self.os_state}",
)
def get_os_state(self):
"""
@@ -109,6 +131,7 @@ class ActiveNode(Node):
Args:
_file_system_state: File system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
self.file_system_state_actual = _file_system_state
if _file_system_state == FILE_SYSTEM_STATE.REPAIRING:
@@ -123,6 +146,14 @@ class ActiveNode(Node):
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
else:
_LOGGER.info(
f"The Nodes operating state is OFF so File System State "
f"cannot be changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node File System State:{self.file_system_state_actual}",
)
def set_file_system_state_if_not_compromised(self, _file_system_state):
"""
@@ -133,6 +164,7 @@ class ActiveNode(Node):
Args:
_file_system_state: File system state
"""
if self.operating_state != HARDWARE_STATE.OFF:
if (
self.file_system_state_actual != FILE_SYSTEM_STATE.CORRUPT
and self.file_system_state_actual != FILE_SYSTEM_STATE.DESTROYED
@@ -151,6 +183,14 @@ class ActiveNode(Node):
self.file_system_state_observed = FILE_SYSTEM_STATE.RESTORING
elif _file_system_state == FILE_SYSTEM_STATE.GOOD:
self.file_system_state_observed = FILE_SYSTEM_STATE.GOOD
else:
_LOGGER.info(
f"The Nodes operating state is OFF so File System State (if not "
f"compromised) cannot be changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node File System State:{self.file_system_state_actual}",
)
def get_file_system_state_actual(self):
"""
@@ -185,7 +225,7 @@ class ActiveNode(Node):
return self.file_system_scanning
def update_file_system_state(self):
"""Updates file system status based on scanning / restore / repair cycle."""
"""Updates file system status based on scanning/restore/repair cycle."""
# Deprecate both the action count (for restoring or reparing) and the scanning count
self.file_system_action_count -= 1
self.file_system_scanning_count -= 1

View File

@@ -1,8 +1,13 @@
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
"""A Service Node (i.e. not an actuator)."""
from primaite.common.enums import SOFTWARE_STATE
import logging
from typing import Final
from primaite.common.enums import HARDWARE_STATE, SOFTWARE_STATE
from primaite.nodes.active_node import ActiveNode
_LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
class ServiceNode(ActiveNode):
"""ServiceNode class."""
@@ -119,6 +124,7 @@ class ServiceNode(ActiveNode):
_protocol: The service (protocol)
_state: The state value
"""
if self.operating_state != HARDWARE_STATE.OFF:
for service_key, service_value in self.services.items():
if service_key == _protocol:
# Can't set to compromised if you're in a patching state
@@ -137,6 +143,15 @@ class ServiceNode(ActiveNode):
else:
# Do nothing
pass
else:
_LOGGER.info(
f"The Nodes operating state is OFF so the state of a service "
f"cannot be changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node Service Protocol:{_protocol}, "
f"Node Service State: {_state}"
)
def set_service_state_if_not_compromised(self, _protocol, _state):
"""
@@ -146,6 +161,7 @@ class ServiceNode(ActiveNode):
_protocol: The service (protocol)
_state: The state value
"""
if self.operating_state != HARDWARE_STATE.OFF:
for service_key, service_value in self.services.items():
if service_key == _protocol:
if service_value.get_state() != SOFTWARE_STATE.COMPROMISED:
@@ -154,6 +170,15 @@ class ServiceNode(ActiveNode):
service_value.patching_count = (
self.config_values.service_patching_duration
)
else:
_LOGGER.info(
f"The Nodes operating state is OFF so the state of a service "
f"cannot be changed. "
f"Node:{self.id}, "
f"Node Operating State:{self.operating_state}, "
f"Node Service Protocol:{_protocol}, "
f"Node Service State:{_state}"
)
def get_service_state(self, _protocol):
"""

121
tests/test_active_node.py Normal file
View File

@@ -0,0 +1,121 @@
"""Used to test Active Node functions."""
import pytest
from primaite.common.enums import FILE_SYSTEM_STATE, HARDWARE_STATE, SOFTWARE_STATE
from primaite.nodes.active_node import ActiveNode
@pytest.mark.parametrize(
"operating_state, expected_state",
[
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
],
)
def test_os_state_change(operating_state, expected_state):
"""
Test that a node cannot change its operating system state.
When its operating state is OFF.
"""
active_node = ActiveNode(
0,
"node",
"COMPUTER",
"1",
operating_state,
"192.168.0.1",
SOFTWARE_STATE.GOOD,
"GOOD",
1,
)
active_node.set_os_state(SOFTWARE_STATE.OVERWHELMED)
assert active_node.get_os_state() == expected_state
@pytest.mark.parametrize(
"operating_state, expected_state",
[
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
],
)
def test_os_state_change_if_not_compromised(operating_state, expected_state):
"""
Test that a node cannot change its operating system state.
If not compromised) when its operating state is OFF.
"""
active_node = ActiveNode(
0,
"node",
"COMPUTER",
"1",
operating_state,
"192.168.0.1",
SOFTWARE_STATE.GOOD,
"GOOD",
1,
)
active_node.set_os_state_if_not_compromised(SOFTWARE_STATE.OVERWHELMED)
assert active_node.get_os_state() == expected_state
@pytest.mark.parametrize(
"operating_state, expected_state",
[
(HARDWARE_STATE.OFF, FILE_SYSTEM_STATE.GOOD),
(HARDWARE_STATE.ON, FILE_SYSTEM_STATE.CORRUPT),
],
)
def test_file_system_change(operating_state, expected_state):
"""Test that a node cannot change its file system state when its operating state is ON."""
active_node = ActiveNode(
0,
"node",
"COMPUTER",
"1",
operating_state,
"192.168.0.1",
"COMPROMISED",
FILE_SYSTEM_STATE.GOOD,
1,
)
active_node.set_file_system_state(FILE_SYSTEM_STATE.CORRUPT)
assert active_node.get_file_system_state_actual() == expected_state
@pytest.mark.parametrize(
"operating_state, expected_state",
[
(HARDWARE_STATE.OFF, FILE_SYSTEM_STATE.GOOD),
(HARDWARE_STATE.ON, FILE_SYSTEM_STATE.CORRUPT),
],
)
def test_file_system_change_if_not_compromised(operating_state, expected_state):
"""
Test that a node cannot change its file system state.
If not compromised) when its operating state is OFF.
"""
active_node = ActiveNode(
0,
"node",
"COMPUTER",
"1",
operating_state,
"192.168.0.1",
"GOOD",
FILE_SYSTEM_STATE.GOOD,
1,
)
active_node.set_file_system_state_if_not_compromised(FILE_SYSTEM_STATE.CORRUPT)
assert active_node.get_file_system_state_actual() == expected_state

View File

@@ -0,0 +1,70 @@
"""Used to test Service Node functions."""
import pytest
from primaite.common.enums import HARDWARE_STATE, SOFTWARE_STATE
from primaite.common.service import Service
from primaite.nodes.service_node import ServiceNode
@pytest.mark.parametrize(
"operating_state, expected_state",
[
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
],
)
def test_service_state_change(operating_state, expected_state):
"""
Test that a node cannot change the state of a running service.
When its operating state is OFF.
"""
service_node = ServiceNode(
0,
"node",
"COMPUTER",
"1",
operating_state,
"192.168.0.1",
"COMPROMISED",
"RESTORING",
1,
)
service = Service("TCP", 80, SOFTWARE_STATE.GOOD)
service_node.add_service(service)
service_node.set_service_state("TCP", SOFTWARE_STATE.OVERWHELMED)
assert service_node.get_service_state("TCP") == expected_state
@pytest.mark.parametrize(
"operating_state, expected_state",
[
(HARDWARE_STATE.OFF, SOFTWARE_STATE.GOOD),
(HARDWARE_STATE.ON, SOFTWARE_STATE.OVERWHELMED),
],
)
def test_service_state_change_if_not_comprised(operating_state, expected_state):
"""
Test that a node cannot change the state of a running service.
If not compromised when its operating state is ON.
"""
service_node = ServiceNode(
0,
"node",
"COMPUTER",
"1",
operating_state,
"192.168.0.1",
"GOOD",
"RESTORING",
1,
)
service = Service("TCP", 80, SOFTWARE_STATE.GOOD)
service_node.add_service(service)
service_node.set_service_state_if_not_compromised("TCP", SOFTWARE_STATE.OVERWHELMED)
assert service_node.get_service_state("TCP") == expected_state