Merged PR 200: Implement node scan and reset
## Summary Implementing the ability for the simulation to scan all node components as well as the ability for the red agent to reveal the node and its components. Implementing the ability for the simulation to reset a node ## Test process Unit tests added ## Checklist - [X] PR is linked to a **work item** - [X] **acceptance criteria** of linked ticket are met - [X] performed **self-review** of the code - [X] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [ ] updated the **change log** - [X] ran **pre-commit** checks for code style - [ ] attended to any **TO-DOs** left in the code Related work items: #1961
This commit is contained in:
@@ -84,6 +84,9 @@ class FileSystemItemABC(SimComponent):
|
||||
previous_hash: Optional[str] = None
|
||||
"Hash of the file contents or the description state"
|
||||
|
||||
revealed_to_red: bool = False
|
||||
"If true, the folder/file has been revealed to the red agent."
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -95,6 +98,7 @@ class FileSystemItemABC(SimComponent):
|
||||
state["status"] = self.health_status.name
|
||||
state["visible_status"] = self.visible_health_status.name
|
||||
state["previous_hash"] = self.previous_hash
|
||||
state["revealed_to_red"] = self.revealed_to_red
|
||||
return state
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
@@ -231,6 +235,32 @@ class FileSystem(SimComponent):
|
||||
state["folders"] = {folder.name: folder.describe_state() for folder in self.folders.values()}
|
||||
return state
|
||||
|
||||
def apply_timestep(self, timestep: int) -> None:
|
||||
"""Apply time step to FileSystem and its child folders and files."""
|
||||
super().apply_timestep(timestep=timestep)
|
||||
|
||||
# apply timestep to folders
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].apply_timestep(timestep=timestep)
|
||||
|
||||
def scan(self, instant_scan: bool = False):
|
||||
"""
|
||||
Scan all the folders (and child files) in the file system.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].scan(instant_scan=instant_scan)
|
||||
|
||||
def reveal_to_red(self, instant_scan: bool = False):
|
||||
"""
|
||||
Reveals all the folders (and child files) in the file system to the red agent.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
for folder_id in self.folders:
|
||||
self.folders[folder_id].reveal_to_red(instant_scan=instant_scan)
|
||||
|
||||
def create_folder(self, folder_name: str) -> Folder:
|
||||
"""
|
||||
Creates a Folder and adds it to the list of folders.
|
||||
@@ -444,6 +474,9 @@ class Folder(FileSystemItemABC):
|
||||
scan_duration: int = -1
|
||||
"How many timesteps to complete a scan."
|
||||
|
||||
red_scan_duration: int = -1
|
||||
"How many timesteps to complete reveal to red scan."
|
||||
|
||||
def _init_request_manager(self) -> RequestManager:
|
||||
rm = super()._init_request_manager()
|
||||
rm.add_request(
|
||||
@@ -489,7 +522,7 @@ class Folder(FileSystemItemABC):
|
||||
|
||||
def apply_timestep(self, timestep: int):
|
||||
"""
|
||||
Apply a single timestep of simulation dynamics to this service.
|
||||
Apply a single timestep of simulation dynamics to this folder and its files.
|
||||
|
||||
In this instance, if any multi-timestep processes are currently occurring (such as scanning),
|
||||
then they are brought one step closer to being finished.
|
||||
@@ -500,14 +533,30 @@ class Folder(FileSystemItemABC):
|
||||
super().apply_timestep(timestep=timestep)
|
||||
|
||||
# scan files each timestep
|
||||
if self.scan_duration > -1:
|
||||
# scan one file per timestep
|
||||
file = self.get_file_by_id(file_uuid=list(self.files)[self.scan_duration - 1])
|
||||
file.scan()
|
||||
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
if self.scan_duration >= 0:
|
||||
self.scan_duration -= 1
|
||||
|
||||
if self.scan_duration == 0:
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.scan()
|
||||
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
# red scan file at each step
|
||||
if self.red_scan_duration >= 0:
|
||||
self.red_scan_duration -= 1
|
||||
|
||||
if self.red_scan_duration == 0:
|
||||
self.revealed_to_red = True
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.reveal_to_red()
|
||||
|
||||
# apply timestep to files in folder
|
||||
for file_id in self.files:
|
||||
self.files[file_id].apply_timestep(timestep=timestep)
|
||||
|
||||
def get_file(self, file_name: str) -> Optional[File]:
|
||||
"""
|
||||
Get a file by its name.
|
||||
@@ -602,9 +651,21 @@ class Folder(FileSystemItemABC):
|
||||
"""Returns true if the folder is being quarantined."""
|
||||
pass
|
||||
|
||||
def scan(self) -> None:
|
||||
"""Update Folder visible status."""
|
||||
if self.scan_duration <= -1:
|
||||
def scan(self, instant_scan: bool = False) -> None:
|
||||
"""
|
||||
Update Folder visible status.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
if instant_scan:
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.scan()
|
||||
if file.visible_health_status == FileSystemItemHealthStatus.CORRUPT:
|
||||
self.visible_health_status = FileSystemItemHealthStatus.CORRUPT
|
||||
return
|
||||
|
||||
if self.scan_duration <= 0:
|
||||
# scan one file per timestep
|
||||
self.scan_duration = len(self.files)
|
||||
self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
|
||||
@@ -612,6 +673,27 @@ class Folder(FileSystemItemABC):
|
||||
# scan already in progress
|
||||
self.fs.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})")
|
||||
|
||||
def reveal_to_red(self, instant_scan: bool = False):
|
||||
"""
|
||||
Reveals the folders and files to the red agent.
|
||||
|
||||
:param: instant_scan: If True, the scan is completed instantly and ignores scan duration. Default False.
|
||||
"""
|
||||
if instant_scan:
|
||||
self.revealed_to_red = True
|
||||
for file_id in self.files:
|
||||
file = self.get_file_by_id(file_uuid=file_id)
|
||||
file.reveal_to_red()
|
||||
return
|
||||
|
||||
if self.red_scan_duration <= 0:
|
||||
# scan one file per timestep
|
||||
self.red_scan_duration = len(self.files)
|
||||
self.fs.sys_log.info(f"Folder revealed to red agent: {self.name} (id: {self.uuid})")
|
||||
else:
|
||||
# scan already in progress
|
||||
self.fs.sys_log.info(f"Red Agent Scan is already in progress {self.name} (id: {self.uuid})")
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
"""
|
||||
Runs a :func:`check_hash` on all files in the folder.
|
||||
@@ -779,6 +861,10 @@ class File(FileSystemItemABC):
|
||||
self.folder.fs.sys_log.info(f"Scanning file {self.sim_path if self.sim_path else path}")
|
||||
self.visible_health_status = self.health_status
|
||||
|
||||
def reveal_to_red(self):
|
||||
"""Reveals the folder/file to the red agent."""
|
||||
self.revealed_to_red = True
|
||||
|
||||
def check_hash(self) -> bool:
|
||||
"""
|
||||
Check if the file has been changed.
|
||||
|
||||
@@ -926,6 +926,18 @@ class Node(SimComponent):
|
||||
shut_down_countdown: int = 0
|
||||
"Time steps needed until node is shut down."
|
||||
|
||||
is_resetting: bool = False
|
||||
"If true, the node will try turning itself off then back on again."
|
||||
|
||||
node_scan_duration: int = 10
|
||||
"How many timesteps until the whole node is scanned. Default 10 time steps."
|
||||
|
||||
node_scan_countdown: int = 0
|
||||
"Time steps until scan is complete"
|
||||
|
||||
red_scan_countdown: int = 0
|
||||
"Time steps until reveal to red scan is complete."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""
|
||||
Initialize the Node with various components and managers.
|
||||
@@ -978,14 +990,18 @@ class Node(SimComponent):
|
||||
self._application_request_manager = RequestManager()
|
||||
rm.add_request("application", RequestType(func=self._application_request_manager))
|
||||
|
||||
rm.add_request("scan", RequestType(func=lambda request, context: ...)) # TODO implement OS scan
|
||||
rm.add_request("scan", RequestType(func=lambda request, context: self.reveal_to_red()))
|
||||
|
||||
rm.add_request("shutdown", RequestType(func=lambda request, context: self.power_off()))
|
||||
rm.add_request("startup", RequestType(func=lambda request, context: self.power_on()))
|
||||
rm.add_request("reset", RequestType(func=lambda request, context: ...)) # TODO implement node reset
|
||||
rm.add_request("reset", RequestType(func=lambda request, context: self.reset())) # TODO implement node reset
|
||||
rm.add_request("logon", RequestType(func=lambda request, context: ...)) # TODO implement logon request
|
||||
rm.add_request("logoff", RequestType(func=lambda request, context: ...)) # TODO implement logoff request
|
||||
|
||||
self._os_request_manager = RequestManager()
|
||||
self._os_request_manager.add_request("scan", RequestType(func=lambda request, context: self.scan()))
|
||||
rm.add_request("os", RequestType(func=self._os_request_manager))
|
||||
|
||||
return rm
|
||||
|
||||
def _install_system_software(self):
|
||||
@@ -1056,7 +1072,7 @@ class Node(SimComponent):
|
||||
|
||||
def apply_timestep(self, timestep: int):
|
||||
"""
|
||||
Apply a single timestep of simulation dynamics to this service.
|
||||
Apply a single timestep of simulation dynamics to this node.
|
||||
|
||||
In this instance, if any multi-timestep processes are currently occurring
|
||||
(such as starting up or shutting down), then they are brought one step closer to
|
||||
@@ -1086,6 +1102,93 @@ class Node(SimComponent):
|
||||
self.operating_state = NodeOperatingState.OFF
|
||||
self.sys_log.info("Turned off")
|
||||
|
||||
# if resetting turn back on
|
||||
if self.is_resetting:
|
||||
self.is_resetting = False
|
||||
self.power_on()
|
||||
|
||||
# time steps which require the node to be on
|
||||
if self.operating_state == NodeOperatingState.ON:
|
||||
# node scanning
|
||||
if self.node_scan_countdown > 0:
|
||||
self.node_scan_countdown -= 1
|
||||
|
||||
if self.node_scan_countdown == 0:
|
||||
# scan everything!
|
||||
for process_id in self.processes:
|
||||
self.processes[process_id].scan()
|
||||
|
||||
# scan services
|
||||
for service_id in self.services:
|
||||
self.services[service_id].scan()
|
||||
|
||||
# scan applications
|
||||
for application_id in self.applications:
|
||||
self.applications[application_id].scan()
|
||||
|
||||
# scan file system
|
||||
self.file_system.scan(instant_scan=True)
|
||||
|
||||
if self.red_scan_countdown > 0:
|
||||
self.red_scan_countdown -= 1
|
||||
|
||||
if self.red_scan_countdown == 0:
|
||||
# scan processes
|
||||
for process_id in self.processes:
|
||||
self.processes[process_id].reveal_to_red()
|
||||
|
||||
# scan services
|
||||
for service_id in self.services:
|
||||
self.services[service_id].reveal_to_red()
|
||||
|
||||
# scan applications
|
||||
for application_id in self.applications:
|
||||
self.applications[application_id].reveal_to_red()
|
||||
|
||||
# scan file system
|
||||
self.file_system.reveal_to_red(instant_scan=True)
|
||||
|
||||
for process_id in self.processes:
|
||||
self.processes[process_id].apply_timestep(timestep=timestep)
|
||||
|
||||
for service_id in self.services:
|
||||
self.services[service_id].apply_timestep(timestep=timestep)
|
||||
|
||||
for application_id in self.applications:
|
||||
self.applications[application_id].apply_timestep(timestep=timestep)
|
||||
|
||||
self.file_system.apply_timestep(timestep=timestep)
|
||||
|
||||
def scan(self) -> None:
|
||||
"""
|
||||
Scan the node and all the items within it.
|
||||
|
||||
Scans the:
|
||||
- Processes
|
||||
- Services
|
||||
- Applications
|
||||
- Folders
|
||||
- Files
|
||||
|
||||
to the red agent.
|
||||
"""
|
||||
self.node_scan_countdown = self.node_scan_duration
|
||||
|
||||
def reveal_to_red(self) -> None:
|
||||
"""
|
||||
Reveals the node and all the items within it to the red agent.
|
||||
|
||||
Set all the:
|
||||
- Processes
|
||||
- Services
|
||||
- Applications
|
||||
- Folders
|
||||
- Files
|
||||
|
||||
`revealed_to_red` to `True`.
|
||||
"""
|
||||
self.red_scan_countdown = self.node_scan_duration
|
||||
|
||||
def power_on(self):
|
||||
"""Power on the Node, enabling its NICs if it is in the OFF state."""
|
||||
if self.operating_state == NodeOperatingState.OFF:
|
||||
@@ -1111,6 +1214,20 @@ class Node(SimComponent):
|
||||
self.operating_state = NodeOperatingState.OFF
|
||||
self.sys_log.info("Turned off")
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Resets the node.
|
||||
|
||||
Powers off the node and sets is_resetting to True.
|
||||
Applying more timesteps will eventually turn the node back on.
|
||||
"""
|
||||
if not self.operating_state.ON:
|
||||
self.sys_log.error(f"Cannot reset {self.hostname} - node is not turned on.")
|
||||
else:
|
||||
self.is_resetting = True
|
||||
self.sys_log.info(f"Resetting {self.hostname}...")
|
||||
self.power_off()
|
||||
|
||||
def connect_nic(self, nic: NIC):
|
||||
"""
|
||||
Connect a NIC (Network Interface Card) to the node.
|
||||
@@ -1267,6 +1384,38 @@ class Node(SimComponent):
|
||||
_LOGGER.info(f"Removed service {service.uuid} from node {self.uuid}")
|
||||
self._service_request_manager.remove_request(service.uuid)
|
||||
|
||||
def install_application(self, application: Application) -> None:
|
||||
"""
|
||||
Install an application on this node.
|
||||
|
||||
:param application: Application instance that has not been installed on any node yet.
|
||||
:type application: Application
|
||||
"""
|
||||
if application in self:
|
||||
_LOGGER.warning(f"Can't add application {application.uuid} to node {self.uuid}. It's already installed.")
|
||||
return
|
||||
self.applications[application.uuid] = application
|
||||
application.parent = self
|
||||
self.sys_log.info(f"Installed application {application.name}")
|
||||
_LOGGER.info(f"Added application {application.uuid} to node {self.uuid}")
|
||||
self._application_request_manager.add_request(application.uuid, RequestType(func=application._request_manager))
|
||||
|
||||
def uninstall_application(self, application: Application) -> None:
|
||||
"""
|
||||
Uninstall and completely remove application from this node.
|
||||
|
||||
:param application: Application object that is currently associated with this node.
|
||||
:type application: Application
|
||||
"""
|
||||
if application not in self:
|
||||
_LOGGER.warning(f"Can't remove application {application.uuid} from node {self.uuid}. It's not installed.")
|
||||
return
|
||||
self.applications.pop(application.uuid)
|
||||
application.parent = None
|
||||
self.sys_log.info(f"Uninstalled application {application.name}")
|
||||
_LOGGER.info(f"Removed application {application.uuid} from node {self.uuid}")
|
||||
self._application_request_manager.remove_request(application.uuid)
|
||||
|
||||
def __contains__(self, item: Any) -> bool:
|
||||
if isinstance(item, Service):
|
||||
return item.uuid in self.services
|
||||
|
||||
@@ -2,7 +2,7 @@ from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Set
|
||||
|
||||
from primaite.simulator.system.software import IOSoftware
|
||||
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
|
||||
|
||||
|
||||
class ApplicationOperatingState(Enum):
|
||||
@@ -32,6 +32,12 @@ class Application(IOSoftware):
|
||||
groups: Set[str] = set()
|
||||
"The set of groups to which the application belongs."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.health_state_visible = SoftwareHealthState.UNUSED
|
||||
self.health_state_actual = SoftwareHealthState.UNUSED
|
||||
|
||||
@abstractmethod
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
|
||||
@@ -82,11 +82,6 @@ class Service(IOSoftware):
|
||||
"""
|
||||
pass
|
||||
|
||||
def scan(self) -> None:
|
||||
"""Update the service visible states."""
|
||||
# update the visible operating state
|
||||
self.health_state_visible = self.health_state_actual
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the service."""
|
||||
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
|
||||
|
||||
@@ -177,6 +177,10 @@ class Software(SimComponent):
|
||||
"""Update the observed health status to match the actual health status."""
|
||||
self.health_state_visible = self.health_state_actual
|
||||
|
||||
def reveal_to_red(self) -> None:
|
||||
"""Reveals the software to the red agent."""
|
||||
self.revealed_to_red = True
|
||||
|
||||
|
||||
class IOSoftware(Software):
|
||||
"""
|
||||
|
||||
@@ -4,9 +4,10 @@ import shutil
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
from typing import Any, Dict, Union
|
||||
from unittest.mock import patch
|
||||
|
||||
import nodeenv
|
||||
import pytest
|
||||
|
||||
from primaite import getLogger
|
||||
@@ -15,6 +16,7 @@ from primaite.primaite_session import PrimaiteSession
|
||||
from primaite.simulator.network.container import Network
|
||||
from primaite.simulator.network.networks import arcd_uc2_network
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.core.sys_log import SysLog
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from tests.mock_and_patch.get_session_path_mock import get_temp_session_path
|
||||
@@ -36,6 +38,13 @@ class TestService(Service):
|
||||
pass
|
||||
|
||||
|
||||
class TestApplication(Application):
|
||||
"""Test Application class"""
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def uc2_network() -> Network:
|
||||
return arcd_uc2_network()
|
||||
@@ -48,6 +57,13 @@ def service(file_system) -> TestService:
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def application(file_system) -> TestApplication:
|
||||
return TestApplication(
|
||||
name="TestApplication", port=Port.ARP, file_system=file_system, sys_log=SysLog(hostname="test_application")
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def file_system() -> FileSystem:
|
||||
return Node(hostname="fs_node").file_system
|
||||
|
||||
@@ -215,8 +215,8 @@ def test_folder_scan(file_system):
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
@@ -226,12 +226,33 @@ def test_folder_scan(file_system):
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
folder.apply_timestep(timestep=2)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
def test_folder_reveal_to_red_scan(file_system):
|
||||
"""Test the ability to reveal files to red."""
|
||||
folder: Folder = file_system.create_folder(folder_name="test_folder")
|
||||
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
|
||||
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
|
||||
|
||||
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
|
||||
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
|
||||
|
||||
assert folder.revealed_to_red is False
|
||||
assert file1.revealed_to_red is False
|
||||
assert file2.revealed_to_red is False
|
||||
|
||||
folder.reveal_to_red()
|
||||
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.revealed_to_red is False
|
||||
assert file1.revealed_to_red is False
|
||||
assert file2.revealed_to_red is False
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
|
||||
assert folder.revealed_to_red is True
|
||||
assert file1.revealed_to_red is True
|
||||
assert file2.revealed_to_red is True
|
||||
|
||||
|
||||
def test_simulated_file_check_hash(file_system):
|
||||
|
||||
@@ -47,8 +47,8 @@ def test_folder_scan_request(populated_file_system):
|
||||
folder.apply_timestep(timestep=0)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
folder.apply_timestep(timestep=1)
|
||||
@@ -58,13 +58,6 @@ def test_folder_scan_request(populated_file_system):
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
folder.apply_timestep(timestep=2)
|
||||
|
||||
assert folder.health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_file_checkhash_request(populated_file_system):
|
||||
"""Test that an agent can request a file hash check."""
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import pytest
|
||||
|
||||
from primaite.simulator.file_system.file_system import File, FileSystemItemHealthStatus, Folder
|
||||
from primaite.simulator.network.hardware.base import Node, NodeOperatingState
|
||||
from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.processes.process import Process
|
||||
from primaite.simulator.system.services.service import Service
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -39,3 +44,113 @@ def test_node_shutdown(node):
|
||||
idx += 1
|
||||
|
||||
assert node.operating_state == NodeOperatingState.OFF
|
||||
|
||||
|
||||
def test_node_os_scan(node, service, application):
|
||||
"""Test OS Scanning."""
|
||||
node.operating_state = NodeOperatingState.ON
|
||||
|
||||
# add process to node
|
||||
# TODO implement processes
|
||||
|
||||
# add services to node
|
||||
service.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
node.install_service(service=service)
|
||||
assert service.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
# add application to node
|
||||
application.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
node.install_application(application=application)
|
||||
assert application.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
# add folder and file to node
|
||||
folder: Folder = node.file_system.create_folder(folder_name="test_folder")
|
||||
folder.corrupt()
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
file: File = node.file_system.create_file(folder_name="test_folder", file_name="file.txt")
|
||||
file2: File = node.file_system.create_file(folder_name="test_folder", file_name="file2.txt")
|
||||
file.corrupt()
|
||||
file2.corrupt()
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
|
||||
|
||||
# run os scan
|
||||
node.apply_request(["os", "scan"])
|
||||
|
||||
# apply time steps
|
||||
for i in range(10):
|
||||
node.apply_timestep(timestep=i)
|
||||
|
||||
# should update the state of all items
|
||||
# TODO assert process.health_state_visible == SoftwareHealthState.COMPROMISED
|
||||
assert service.health_state_visible == SoftwareHealthState.COMPROMISED
|
||||
assert application.health_state_visible == SoftwareHealthState.COMPROMISED
|
||||
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
|
||||
def test_node_red_scan(node, service, application):
|
||||
"""Test revealing to red"""
|
||||
node.operating_state = NodeOperatingState.ON
|
||||
|
||||
# add process to node
|
||||
# TODO implement processes
|
||||
|
||||
# add services to node
|
||||
node.install_service(service=service)
|
||||
assert service.revealed_to_red is False
|
||||
|
||||
# add application to node
|
||||
application.health_state_actual = SoftwareHealthState.COMPROMISED
|
||||
node.install_application(application=application)
|
||||
assert application.revealed_to_red is False
|
||||
|
||||
# add folder and file to node
|
||||
folder: Folder = node.file_system.create_folder(folder_name="test_folder")
|
||||
assert folder.revealed_to_red is False
|
||||
|
||||
file: File = node.file_system.create_file(folder_name="test_folder", file_name="file.txt")
|
||||
file2: File = node.file_system.create_file(folder_name="test_folder", file_name="file2.txt")
|
||||
assert file.revealed_to_red is False
|
||||
assert file2.revealed_to_red is False
|
||||
|
||||
# run os scan
|
||||
node.apply_request(["scan"])
|
||||
|
||||
# apply time steps
|
||||
for i in range(10):
|
||||
node.apply_timestep(timestep=i)
|
||||
|
||||
# should update the state of all items
|
||||
# TODO assert process.revealed_to_red is True
|
||||
assert service.revealed_to_red is True
|
||||
assert application.revealed_to_red is True
|
||||
assert folder.revealed_to_red is True
|
||||
assert file.revealed_to_red is True
|
||||
assert file2.revealed_to_red is True
|
||||
|
||||
|
||||
def test_reset_node(node):
|
||||
"""Test that a node can be reset."""
|
||||
node.operating_state = NodeOperatingState.ON
|
||||
|
||||
node.apply_request(["reset"])
|
||||
assert node.operating_state == NodeOperatingState.SHUTTING_DOWN
|
||||
|
||||
"""
|
||||
3 steps to shut down
|
||||
2 steps to set up the turning of it back on
|
||||
3 steps to turn back on
|
||||
|
||||
3 + 2 + 3 = 8
|
||||
kwik mafs
|
||||
"""
|
||||
|
||||
for i in range(8):
|
||||
node.apply_timestep(timestep=i)
|
||||
|
||||
if i == 3:
|
||||
assert node.operating_state == NodeOperatingState.BOOTING
|
||||
|
||||
assert node.operating_state == NodeOperatingState.ON
|
||||
|
||||
Reference in New Issue
Block a user