Merge remote-tracking branch 'origin/dev' into dev-game-layer
This commit is contained in:
@@ -84,6 +84,9 @@ class FileSystemItemABC(SimComponent):
|
||||
previous_hash: Optional[str] = None
|
||||
"Hash of the file contents or the description state"
|
||||
|
||||
revealed_to_red: bool = False
|
||||
"If true, the folder/file has been revealed to the red agent."
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -95,6 +98,7 @@ class FileSystemItemABC(SimComponent):
|
||||
state["health_status"] = self.health_status.value
|
||||
state["visible_health_status"] = self.visible_health_status.value
|
||||
state["previous_hash"] = self.previous_hash
|
||||
state["revealed_to_red"] = self.revealed_to_red
|
||||
return state
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
@@ -231,6 +235,32 @@ class FileSystem(SimComponent):
|
||||
state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()}
|
||||
return state
|
||||
|
||||
def apply_timestep(self, timestep: int) -> None:
|
||||
"""Apply time step to FileSystem and its child folders and files."""
|
||||
super().apply_timestep(timestep=timestep)
|
||||
|
||||
# apply timestep to folders
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].apply_timestep(timestep=timestep)
|
||||
|
||||
def scan(self, instant_scan: bool = False):
|
||||
"""
|
||||
Scan all the folders (and child files) in the file system.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].scan(instant_scan=instant_scan)
|
||||
|
||||
def reveal_to_red(self, instant_scan: bool = False):
|
||||
"""
|
||||
Reveals all the folders (and child files) in the file system to the red agent.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].reveal_to_red(instant_scan=instant_scan)
|
||||
|
||||
def create_folder(self, folder_name: str) -> Folder:
|
||||
"""
|
||||
Creates a Folder and adds it to the list of folders.
|
||||
@@ -444,6 +474,9 @@ class Folder(FileSystemItemABC):
|
||||
scan_duration: int = -1
|
||||
"How many timesteps to complete a scan."
|
||||
|
||||
red_scan_duration: int = -1
|
||||
"How many timesteps to complete reveal to red scan."
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
rm = super()._init_request_manager()
|
||||
rm.add_request(
|
||||
@@ -489,7 +522,7 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
def apply_timestep(self, timestep: int):
|
||||
"""
|
||||
Apply a single timestep of simulation dynamics to this service.
|
||||
Apply a single timestep of simulation dynamics to this folder and its files.
|
||||
|
||||
In this instance, if any multi-timestep processes are currently occurring (such as scanning),
|
||||
then they are brought one step closer to being finished.
|
||||
@@ -500,14 +533,30 @@ class Folder(FileSystemItemABC):
|
||||
super().apply_timestep(timestep=timestep)
|
||||
|
||||
# scan files each timestep
|
||||
if self.scan_duration > -1:
|
||||
# scan one file per timestep
|
||||
file = self.get_file_by_id(file_uuid=list(self.files)[self.scan_duration - 1])
|
||||
file.scan()
|
||||
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
if self.scan_duration >= 0:
|
||||
self.scan_duration -= 1
|
||||
|
||||
if self.scan_duration == 0:
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.scan()
|
||||
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
# red scan file at each step
|
||||
if self.red_scan_duration >= 0:
|
||||
self.red_scan_duration -= 1
|
||||
|
||||
if self.red_scan_duration == 0:
|
||||
self.revealed_to_red = True
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.reveal_to_red()
|
||||
|
||||
# apply timestep to files in folder
|
||||
for file_id in self.files:
|
||||
self.files[file_id].apply_timestep(timestep=timestep)
|
||||
|
||||
def get_file(self, file_name: str) -> Optional[File]:
|
||||
"""
|
||||
Get a file by its name.
|
||||
@@ -602,9 +651,21 @@ class Folder(FileSystemItemABC):
|
||||
"""Returns true if the folder is being quarantined."""
|
||||
pass
|
||||
|
||||
def scan(self) -> None:
|
||||
"""Update Folder visible status."""
|
||||
if self.scan_duration <= -1:
|
||||
def scan(self, instant_scan: bool = False) -> None:
|
||||
"""
|
||||
Update Folder visible status.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
if instant_scan:
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.scan()
|
||||
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
return
|
||||
|
||||
if self.scan_duration <= 0:
|
||||
# scan one file per timestep
|
||||
self.scan_duration = len(self.files)
|
||||
self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
|
||||
@@ -612,6 +673,27 @@ class Folder(FileSystemItemABC):
|
||||
# scan already in progress
|
||||
self.fs.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})")
|
||||
|
||||
def reveal_to_red(self, instant_scan: bool = False):
|
||||
"""
|
||||
Reveals the folders and files to the red agent.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
if instant_scan:
|
||||
self.revealed_to_red = True
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.reveal_to_red()
|
||||
return
|
||||
|
||||
if self.red_scan_duration <= 0:
|
||||
# scan one file per timestep
|
||||
self.red_scan_duration = len(self.files)
|
||||
self.fs.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})")
|
||||
else:
|
||||
# scan already in progress
|
||||
self.fs.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})")
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
"""
|
||||
Runs a :func:`check_hash` on all files in the folder.
|
||||
@@ -779,6 +861,10 @@ class File(FileSystemItemABC):
|
||||
self.folder.fs.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
|
||||
self.visible_health_status = self.health_status
|
||||
|
||||
def reveal_to_red(self):
|
||||
"""Reveals the folder/file to the red agent."""
|
||||
self.revealed_to_red = True
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
"""
|
||||
Check if the file has been changed.
|
||||
|
||||
@@ -926,6 +926,18 @@ class Node(SimComponent):
|
||||
shut_down_countdown: int = 0
|
||||
"Time steps needed until node is shut down."
|
||||
|
||||
is_resetting: bool = False
|
||||
"If true, the node will try turning itself off then back on again."
|
||||
|
||||
node_scan_duration: int = 10
|
||||
"How many timesteps until the whole node is scanned. Default 10 time steps."
|
||||
|
||||
node_scan_countdown: int = 0
|
||||
"Time steps until scan is complete"
|
||||
|
||||
red_scan_countdown: int = 0
|
||||
"Time steps until reveal to red scan is complete."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""
|
||||
Initialize the Node with various components and managers.
|
||||
@@ -979,14 +991,18 @@ class Node(SimComponent):
|
||||
self._application_request_manager = RequestManager()
|
||||
rm.add_request("application", RequestType(func=self._application_request_manager))
|
||||
|
||||
rm.add_request("scan", RequestType(func=lambda request, context: ...)) # TODO implement OS scan
|
||||
rm.add_request("scan", RequestType(func=lambda request, context: self.reveal_to_red()))
|
||||
|
||||
rm.add_request("shutdown", RequestType(func=lambda request, context: self.power_off()))
|
||||
rm.add_request("startup", RequestType(func=lambda request, context: self.power_on()))
|
||||
rm.add_request("reset", RequestType(func=lambda request, context: ...)) # TODO implement node reset
|
||||
rm.add_request("reset", RequestType(func=lambda request, context: self.reset())) # TODO implement node reset
|
||||
rm.add_request("logon", RequestType(func=lambda request, context: ...)) # TODO implement logon request
|
||||
rm.add_request("logoff", RequestType(func=lambda request, context: ...)) # TODO implement logoff request
|
||||
|
||||
self._os_request_manager = RequestManager()
|
||||
self._os_request_manager.add_request("scan", RequestType(func=lambda request, context: self.scan()))
|
||||
rm.add_request("os", RequestType(func=self._os_request_manager))
|
||||
|
||||
return rm
|
||||
|
||||
def _install_system_software(self):
|
||||
@@ -1057,7 +1073,7 @@ class Node(SimComponent):
|
||||
|
||||
def apply_timestep(self, timestep: int):
|
||||
"""
|
||||
Apply a single timestep of simulation dynamics to this service.
|
||||
Apply a single timestep of simulation dynamics to this node.
|
||||
|
||||
In this instance, if any multi-timestep processes are currently occurring
|
||||
(such as starting up or shutting down), then they are brought one step closer to
|
||||
@@ -1087,6 +1103,93 @@ class Node(SimComponent):
|
||||
self.operating_state = NodeOperatingState.OFF
|
||||
self.sys_log.info("Turned off")
|
||||
|
||||
# if resetting turn back on
|
||||
if self.is_resetting:
|
||||
self.is_resetting = False
|
||||
self.power_on()
|
||||
|
||||
# time steps which require the node to be on
|
||||
if self.operating_state == NodeOperatingState.ON:
|
||||
# node scanning
|
||||
if self.node_scan_countdown > 0:
|
||||
self.node_scan_countdown -= 1
|
||||
|
||||
if self.node_scan_countdown == 0:
|
||||
# scan everything!
|
||||
for process_id in self.processes:
|
||||
self.processes[process_id].scan()
|
||||
|
||||
# scan services
|
||||
for service_id in self.services:
|
||||
self.services[service_id].scan()
|
||||
|
||||
# scan applications
|
||||
for application_id in self.applications:
|
||||
self.applications[application_id].scan()
|
||||
|
||||
# scan file system
|
||||
self.file_system.scan(instant_scan=True)
|
||||
|
||||
if self.red_scan_countdown > 0:
|
||||
self.red_scan_countdown -= 1
|
||||
|
||||
if self.red_scan_countdown == 0:
|
||||
# scan processes
|
||||
for process_id in self.processes:
|
||||
self.processes[process_id].reveal_to_red()
|
||||
|
||||
# scan services
|
||||
for service_id in self.services:
|
||||
self.services[service_id].reveal_to_red()
|
||||
|
||||
# scan applications
|
||||
for application_id in self.applications:
|
||||
self.applications[application_id].reveal_to_red()
|
||||
|
||||
# scan file system
|
||||
self.file_system.reveal_to_red(instant_scan=True)
|
||||
|
||||
for process_id in self.processes:
|
||||
self.processes[process_id].apply_timestep(timestep=timestep)
|
||||
|
||||
for service_id in self.services:
|
||||
self.services[service_id].apply_timestep(timestep=timestep)
|
||||
|
||||
for application_id in self.applications:
|
||||
self.applications[application_id].apply_timestep(timestep=timestep)
|
||||
|
||||
self.file_system.apply_timestep(timestep=timestep)
|
||||
|
||||
def scan(self) -> None:
|
||||
"""
|
||||
Scan the node and all the items within it.
|
||||
|
||||
Scans the:
|
||||
- Processes
|
||||
- Services
|
||||
- Applications
|
||||
- Folders
|
||||
- Files
|
||||
|
||||
to the red agent.
|
||||
"""
|
||||
self.node_scan_countdown = self.node_scan_duration
|
||||
|
||||
def reveal_to_red(self) -> None:
|
||||
"""
|
||||
Reveals the node and all the items within it to the red agent.
|
||||
|
||||
Set all the:
|
||||
- Processes
|
||||
- Services
|
||||
- Applications
|
||||
- Folders
|
||||
- Files
|
||||
|
||||
`revealed_to_red` to `True`.
|
||||
"""
|
||||
self.red_scan_countdown = self.node_scan_duration
|
||||
|
||||
def power_on(self):
|
||||
"""Power on the Node, enabling its NICs if it is in the OFF state."""
|
||||
if self.operating_state == NodeOperatingState.OFF:
|
||||
@@ -1112,6 +1215,20 @@ class Node(SimComponent):
|
||||
self.operating_state = NodeOperatingState.OFF
|
||||
self.sys_log.info("Turned off")
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Resets the node.
|
||||
|
||||
Powers off the node and sets is_resetting to True.
|
||||
Applying more timesteps will eventually turn the node back on.
|
||||
"""
|
||||
if not self.operating_state.ON:
|
||||
self.sys_log.error(f"Cannot reset {self.hostname} - node is not turned on.")
|
||||
else:
|
||||
self.is_resetting = True
|
||||
self.sys_log.info(f"Resetting {self.hostname}...")
|
||||
self.power_off()
|
||||
|
||||
def connect_nic(self, nic: NIC):
|
||||
"""
|
||||
Connect a NIC (Network Interface Card) to the node.
|
||||
|
||||
@@ -2,7 +2,7 @@ from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Set
|
||||
|
||||
from primaite.simulator.system.software import IOSoftware
|
||||
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
|
||||
|
||||
|
||||
class ApplicationOperatingState(Enum):
|
||||
@@ -32,6 +32,12 @@ class Application(IOSoftware):
|
||||
groups: Set[str] = set()
|
||||
"The set of groups to which the application belongs."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.health_state_visible = SoftwareHealthState.UNUSED
|
||||
self.health_state_actual = SoftwareHealthState.UNUSED
|
||||
|
||||
@abstractmethod
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
|
||||
@@ -82,11 +82,6 @@ class Service(IOSoftware):
|
||||
"""
|
||||
pass
|
||||
|
||||
def scan(self) -> None:
|
||||
"""Update the service visible states."""
|
||||
# update the visible operating state
|
||||
self.health_state_visible = self.health_state_actual
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the service."""
|
||||
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
|
||||
|
||||
@@ -177,6 +177,10 @@ class Software(SimComponent):
|
||||
"""Update the observed health status to match the actual health status."""
|
||||
self.health_state_visible = self.health_state_actual
|
||||
|
||||
def reveal_to_red(self) -> None:
|
||||
"""Reveals the software to the red agent."""
|
||||
self.revealed_to_red = True
|
||||
|
||||
|
||||
class IOSoftware(Software):
|
||||
"""
|
||||
|
||||
@@ -4,9 +4,10 @@ import shutil
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
from typing import Any, Dict, Union
|
||||
from unittest.mock import patch
|
||||
|
||||
import nodeenv
|
||||
import pytest
|
||||
|
||||
from primaite import getLogger
|
||||
@@ -16,6 +17,7 @@ from primaite import getLogger
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.networks import arcd_uc2_network
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from tests.mock_and_patch.get_session_path_mock import get_temp_session_path
|
||||
@@ -37,6 +39,13 @@ class TestService(Service):
|
||||
pass
|
||||
|
||||
|
||||
class TestApplication(Application):
|
||||
"""Test Application class"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def uc2_network() -> Network:
|
||||
return arcd_uc2_network()
|
||||
@@ -49,6 +58,13 @@ def service(file_system) -> TestService:
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def application(file_system) -> TestApplication:
|
||||
return TestApplication(
|
||||
name="TestApplication", port=Port.ARP, file_system=file_system, sys_log=SysLog(hostname="test_application")
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def file_system() -> FileSystem:
|
||||
return Node(hostname="fs_node").file_system
|
||||
|
||||
@@ -215,8 +215,8 @@ def test_folder_scan(file_system):
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
@@ -226,12 +226,33 @@ def test_folder_scan(file_system):
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
folder.apply_timestep(timestep=2)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
def test_folder_reveal_to_red_scan(file_system):
|
||||
"""Test the ability to reveal files to red."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
|
||||
|
||||
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
|
||||
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
|
||||
|
||||
assert folder.revealed_to_red is False
|
||||
assert file1.revealed_to_red is False
|
||||
assert file2.revealed_to_red is False
|
||||
|
||||
folder.reveal_to_red()
|
||||
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.revealed_to_red is False
|
||||
assert file1.revealed_to_red is False
|
||||
assert file2.revealed_to_red is False
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
|
||||
assert folder.revealed_to_red is True
|
||||
assert file1.revealed_to_red is True
|
||||
assert file2.revealed_to_red is True
|
||||
|
||||
|
||||
def test_simulated_file_check_hash(file_system):
|
||||
|
||||
@@ -47,8 +47,8 @@ def test_folder_scan_request(populated_file_system):
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
@@ -58,13 +58,6 @@ def test_folder_scan_request(populated_file_system):
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
folder.apply_timestep(timestep=2)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_checkhash_request(populated_file_system):
|
||||
"""Test that an agent can request a file hash check."""
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.file_system.file_system import File, FileSystemItemHealthStatus, Folder
|
||||
from primaite.simulator.network.hardware.base import Node, NodeOperatingState
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.processes.process import Process
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -39,3 +44,113 @@ def test_node_shutdown(node):
|
||||
idx += 1
|
||||
|
||||
assert node.operating_state == NodeOperatingState.OFF
|
||||
|
||||
|
||||
def test_node_os_scan(node, service, application):
|
||||
"""Test OS Scanning."""
|
||||
node.operating_state = NodeOperatingState.ON
|
||||
|
||||
# add process to node
|
||||
# TODO implement processes
|
||||
|
||||
# add services to node
|
||||
service.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
node.install_service(service=service)
|
||||
assert service.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
# add application to node
|
||||
application.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
node.install_application(application=application)
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
# add folder and file to node
|
||||
folder: Folder = node.file_system.create_folder(folder_name="test_folder")
|
||||
folder.corrupt()
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
file: File = node.file_system.create_file(folder_name="test_folder", file_name="file.txt")
|
||||
file2: File = node.file_system.create_file(folder_name="test_folder", file_name="file2.txt")
|
||||
file.corrupt()
|
||||
file2.corrupt()
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
# run os scan
|
||||
node.apply_request(["os", "scan"])
|
||||
|
||||
# apply time steps
|
||||
for i in range(10):
|
||||
node.apply_timestep(timestep=i)
|
||||
|
||||
# should update the state of all items
|
||||
# TODO assert process.health_state_visible == SoftwareHealthState.COMPROMISED
|
||||
assert service.health_state_visible == SoftwareHealthState.COMPROMISED
|
||||
assert application.health_state_visible == SoftwareHealthState.COMPROMISED
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_node_red_scan(node, service, application):
|
||||
"""Test revealing to red"""
|
||||
node.operating_state = NodeOperatingState.ON
|
||||
|
||||
# add process to node
|
||||
# TODO implement processes
|
||||
|
||||
# add services to node
|
||||
node.install_service(service=service)
|
||||
assert service.revealed_to_red is False
|
||||
|
||||
# add application to node
|
||||
application.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
node.install_application(application=application)
|
||||
assert application.revealed_to_red is False
|
||||
|
||||
# add folder and file to node
|
||||
folder: Folder = node.file_system.create_folder(folder_name="test_folder")
|
||||
assert folder.revealed_to_red is False
|
||||
|
||||
file: File = node.file_system.create_file(folder_name="test_folder", file_name="file.txt")
|
||||
file2: File = node.file_system.create_file(folder_name="test_folder", file_name="file2.txt")
|
||||
assert file.revealed_to_red is False
|
||||
assert file2.revealed_to_red is False
|
||||
|
||||
# run os scan
|
||||
node.apply_request(["scan"])
|
||||
|
||||
# apply time steps
|
||||
for i in range(10):
|
||||
node.apply_timestep(timestep=i)
|
||||
|
||||
# should update the state of all items
|
||||
# TODO assert process.revealed_to_red is True
|
||||
assert service.revealed_to_red is True
|
||||
assert application.revealed_to_red is True
|
||||
assert folder.revealed_to_red is True
|
||||
assert file.revealed_to_red is True
|
||||
assert file2.revealed_to_red is True
|
||||
|
||||
|
||||
def test_reset_node(node):
|
||||
"""Test that a node can be reset."""
|
||||
node.operating_state = NodeOperatingState.ON
|
||||
|
||||
node.apply_request(["reset"])
|
||||
assert node.operating_state == NodeOperatingState.SHUTTING_DOWN
|
||||
|
||||
"""
|
||||
3 steps to shut down
|
||||
2 steps to set up the turning of it back on
|
||||
3 steps to turn back on
|
||||
|
||||
3 + 2 + 3 = 8
|
||||
kwik mafs
|
||||
"""
|
||||
|
||||
for i in range(8):
|
||||
node.apply_timestep(timestep=i)
|
||||
|
||||
if i == 3:
|
||||
assert node.operating_state == NodeOperatingState.BOOTING
|
||||
|
||||
assert node.operating_state == NodeOperatingState.ON
|
||||
|
||||
Reference in New Issue
Block a user