#1947: folder/file scan now take multiple time steps to complete

This commit is contained in:
Czar.Echavez
2023-10-23 15:58:37 +01:00
parent ffc4711afb
commit 724beb1a29
9 changed files with 184 additions and 101 deletions

View File

@@ -47,7 +47,7 @@ def convert_size(size_bytes: int) -> str:
return f"{s} {size_name[i]}"
class FileSystemItemStatus(Enum):
class FileSystemItemHealthStatus(Enum):
"""Status of the FileSystemItem."""
GOOD = 0
@@ -73,10 +73,10 @@ class FileSystemItemABC(SimComponent):
name: str
"The name of the FileSystemItemABC."
status: FileSystemItemStatus = FileSystemItemStatus.GOOD
health_status: FileSystemItemHealthStatus = FileSystemItemHealthStatus.GOOD
"Actual status of the current FileSystemItem"
visible_status: FileSystemItemStatus = FileSystemItemStatus.GOOD
visible_health_status: FileSystemItemHealthStatus = FileSystemItemHealthStatus.GOOD
"Visible status of the current FileSystemItem"
previous_hash: Optional[str] = None
@@ -90,8 +90,8 @@ class FileSystemItemABC(SimComponent):
"""
state = super().describe_state()
state["name"] = self.name
state["status"] = self.status.name
state["visible_status"] = self.visible_status.name
state["status"] = self.health_status.name
state["visible_status"] = self.visible_health_status.name
state["previous_hash"] = self.previous_hash
return state
@@ -123,7 +123,7 @@ class FileSystemItemABC(SimComponent):
"""Update the FileSystemItem states."""
super().scan()
self.visible_status = self.status
self.visible_health_status = self.health_status
@abstractmethod
def check_hash(self) -> bool:
@@ -135,7 +135,7 @@ class FileSystemItemABC(SimComponent):
Return False if corruption is detected, otherwise True
"""
# cannot check hash if deleted
if self.status == FileSystemItemStatus.DELETED:
if self.health_status == FileSystemItemHealthStatus.DELETED:
return False
@abstractmethod
@@ -146,7 +146,7 @@ class FileSystemItemABC(SimComponent):
True if successfully repaired. False otherwise.
"""
# cannot repair if deleted
if self.status == FileSystemItemStatus.DELETED:
if self.health_status == FileSystemItemHealthStatus.DELETED:
return False
@abstractmethod
@@ -157,7 +157,7 @@ class FileSystemItemABC(SimComponent):
True if successfully corrupted. False otherwise.
"""
# cannot corrupt if deleted
if self.status == FileSystemItemStatus.DELETED:
if self.health_status == FileSystemItemHealthStatus.DELETED:
return False
def delete(self) -> None:
@@ -166,7 +166,7 @@ class FileSystemItemABC(SimComponent):
True if successfully deleted. False otherwise.
"""
self.status = FileSystemItemStatus.DELETED
self.health_status = FileSystemItemHealthStatus.DELETED
def restore(self) -> None:
"""Restore the file/folder to the state before it got ruined."""
@@ -456,6 +456,10 @@ class Folder(FileSystemItemABC):
"Files by their name as <file name>.<file type>."
deleted_files: Dict[str, File] = {}
"List of files that have been deleted."
scan_duration: int = -1
"How many timesteps to complete a scan."
def describe_state(self) -> Dict:
"""
@@ -465,6 +469,7 @@ class Folder(FileSystemItemABC):
"""
state = super().describe_state()
state["files"] = {file.name: file.describe_state() for uuid, file in self.files.items()}
state["deleted_files"] = {file.name: file.describe_state() for uuid, file in self.deleted_files.items()}
return state
def show(self, markdown: bool = False):
@@ -492,6 +497,21 @@ class Folder(FileSystemItemABC):
"""
return sum(file.size for file in self.files.values() if file.size is not None)
def apply_timestep(self, timestep: int):
"""
Used to run the actions that last over multiple timesteps.
:param: timestep: the current timestep.
"""
super().apply_timestep(timestep=timestep)
# scan files each timestep
if self.scan_duration > -1:
# scan one file per timestep
file = self.get_file_by_id(file_uuid=list(self.files)[self.scan_duration - 1])
file.scan()
self.scan_duration -= 1
def get_file(self, file_name: str) -> Optional[File]:
"""
Get a file by its name.
@@ -573,30 +593,31 @@ class Folder(FileSystemItemABC):
def quarantine(self):
"""Quarantines the File System Folder."""
if self.status != FileSystemItemStatus.QUARANTINED:
self.status = FileSystemItemStatus.QUARANTINED
if self.health_status != FileSystemItemHealthStatus.QUARANTINED:
self.health_status = FileSystemItemHealthStatus.QUARANTINED
self.fs.sys_log.info(f"Quarantined folder ./{self.name}")
def unquarantine(self):
"""Unquarantine of the File System Folder."""
if self.status == FileSystemItemStatus.QUARANTINED:
self.status = FileSystemItemStatus.GOOD
if self.health_status == FileSystemItemHealthStatus.QUARANTINED:
self.health_status = FileSystemItemHealthStatus.GOOD
self.fs.sys_log.info(f"Quarantined folder ./{self.name}")
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
return self.status == FileSystemItemStatus.QUARANTINED
return self.health_status == FileSystemItemHealthStatus.QUARANTINED
def scan(self) -> None:
"""Update Folder visible status."""
super().scan()
# update the status of files in folder
for file_id in self.files:
file = self.get_file_by_id(file_uuid=file_id)
file.scan()
self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
if self.scan_duration <= -1:
# scan one file per timestep
self.scan_duration = len(self.files)
self.fs.sys_log.info(f"Scanning folder {self.name} (id: {self.uuid})")
else:
# scan already in progress
self.fs.sys_log.info(f"Scan is already in progress {self.name} (id: {self.uuid})")
def check_hash(self) -> bool:
"""
@@ -638,8 +659,8 @@ class Folder(FileSystemItemABC):
repaired = file.repair()
# set file status to good if corrupt
if self.status == FileSystemItemStatus.CORRUPTED:
self.status = FileSystemItemStatus.GOOD
if self.health_status == FileSystemItemHealthStatus.CORRUPTED:
self.health_status = FileSystemItemHealthStatus.GOOD
repaired = True
self.fs.sys_log.info(f"Repaired folder {self.name} (id: {self.uuid})")
@@ -674,8 +695,8 @@ class Folder(FileSystemItemABC):
corrupted = file.corrupt()
# set file status to corrupt if good
if self.status == FileSystemItemStatus.GOOD:
self.status = FileSystemItemStatus.CORRUPTED
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPTED
corrupted = True
self.fs.sys_log.info(f"Corrupted folder {self.name} (id: {self.uuid})")
@@ -824,8 +845,8 @@ class File(FileSystemItemABC):
super().repair()
# set file status to good if corrupt
if self.status == FileSystemItemStatus.CORRUPTED:
self.status = FileSystemItemStatus.GOOD
if self.health_status == FileSystemItemHealthStatus.CORRUPTED:
self.health_status = FileSystemItemHealthStatus.GOOD
path = self.folder.name + "/" + self.name
self.folder.fs.sys_log.info(f"Repaired file {self.sim_path if self.sim_path else path}")
@@ -842,8 +863,8 @@ class File(FileSystemItemABC):
corrupted = False
# set file status to good if corrupt
if self.status == FileSystemItemStatus.GOOD:
self.status = FileSystemItemStatus.CORRUPTED
if self.health_status == FileSystemItemHealthStatus.GOOD:
self.health_status = FileSystemItemHealthStatus.CORRUPTED
corrupted = True
path = self.folder.name + "/" + self.name

View File

@@ -1000,6 +1000,7 @@ class Node(SimComponent):
"applications": {uuid: app.describe_state() for uuid, app in self.applications.items()},
"services": {uuid: svc.describe_state() for uuid, svc in self.services.items()},
"process": {uuid: proc.describe_state() for uuid, proc in self.processes.items()},
"revealed_to_red": self.revealed_to_red,
}
)
return state

View File

@@ -3,7 +3,7 @@ from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.system.software import IOSoftware
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
_LOGGER = getLogger(__name__)
@@ -35,14 +35,17 @@ class Service(IOSoftware):
operating_state: ServiceOperatingState = ServiceOperatingState.STOPPED
"The current operating state of the Service."
visible_operating_state: ServiceOperatingState = ServiceOperatingState.STOPPED
"The visible operating state of the service."
restart_duration: int = 5
"How many timesteps does it take to restart this service."
restart_countdown: Optional[int] = None
"If currently restarting, how many timesteps remain until the restart is finished."
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.health_state_visible = SoftwareHealthState.UNUSED
self.health_state_actual = SoftwareHealthState.UNUSED
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request("scan", RequestType(func=lambda request, context: self.scan()))
@@ -65,9 +68,9 @@ class Service(IOSoftware):
:rtype: Dict
"""
state = super().describe_state()
state.update(
{"operating_state": self.operating_state.name, "visible_operating_state": self.visible_operating_state.name}
)
state["operating_state"] = self.operating_state.name
state["health_state_actual"] = self.health_state_actual
state["health_state_visible"] = self.health_state_visible
return state
def reset_component_for_episode(self, episode: int):
@@ -85,49 +88,56 @@ class Service(IOSoftware):
super().scan()
# update the visible operating state
self.visible_operating_state = self.operating_state
self.health_state_visible = self.health_state_actual
def stop(self) -> None:
"""Stop the service."""
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
self.sys_log.info(f"Stopping service {self.name}")
self.operating_state = ServiceOperatingState.STOPPED
self.health_state_actual = SoftwareHealthState.UNUSED
def start(self, **kwargs) -> None:
"""Start the service."""
if self.operating_state == ServiceOperatingState.STOPPED:
self.sys_log.info(f"Starting service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.health_state_actual = SoftwareHealthState.GOOD
def pause(self) -> None:
"""Pause the service."""
if self.operating_state == ServiceOperatingState.RUNNING:
self.sys_log.info(f"Pausing service {self.name}")
self.operating_state = ServiceOperatingState.PAUSED
self.health_state_actual = SoftwareHealthState.OVERWHELMED
def resume(self) -> None:
"""Resume paused service."""
if self.operating_state == ServiceOperatingState.PAUSED:
self.sys_log.info(f"Resuming service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.health_state_actual = SoftwareHealthState.GOOD
def restart(self) -> None:
"""Restart running service."""
if self.operating_state in [ServiceOperatingState.RUNNING, ServiceOperatingState.PAUSED]:
self.sys_log.info(f"Pausing service {self.name}")
self.operating_state = ServiceOperatingState.RESTARTING
self.health_state_actual = SoftwareHealthState.OVERWHELMED
self.restart_countdown = self.restart_duration
def disable(self) -> None:
"""Disable the service."""
self.sys_log.info(f"Disabling Application {self.name}")
self.operating_state = ServiceOperatingState.DISABLED
self.health_state_actual = SoftwareHealthState.OVERWHELMED
def enable(self) -> None:
"""Enable the disabled service."""
if self.operating_state == ServiceOperatingState.DISABLED:
self.sys_log.info(f"Enabling Application {self.name}")
self.operating_state = ServiceOperatingState.STOPPED
self.health_state_actual = SoftwareHealthState.OVERWHELMED
def apply_timestep(self, timestep: int) -> None:
"""
@@ -144,4 +154,5 @@ class Service(IOSoftware):
if self.restart_countdown <= 0:
_LOGGER.debug(f"Restarting finished for service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.health_state_actual = SoftwareHealthState.GOOD
self.restart_countdown -= 1

View File

@@ -31,6 +31,8 @@ class SoftwareType(Enum):
class SoftwareHealthState(Enum):
"""Enumeration of the Software Health States."""
UNUSED = 0
"Unused state."
GOOD = 1
"The software is in a good and healthy condition."
COMPROMISED = 2

View File

@@ -1,6 +1,5 @@
from ipaddress import IPv4Address
from primaite.simulator.file_system.file_system import FileSystemItemStatus
from primaite.simulator.network.hardware.nodes.server import Server
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.services.database.database_service import DatabaseService

View File

@@ -1,6 +1,6 @@
import pytest
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemStatus, Folder
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder
from primaite.simulator.file_system.file_type import FileType
@@ -146,13 +146,13 @@ def test_file_corrupt_repair(file_system):
file.corrupt()
assert folder.status == FileSystemItemStatus.GOOD
assert file.status == FileSystemItemStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
file.repair()
assert folder.status == FileSystemItemStatus.GOOD
assert file.status == FileSystemItemStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_corrupt_repair(file_system):
@@ -163,14 +163,14 @@ def test_folder_corrupt_repair(file_system):
folder.corrupt()
file = folder.get_file(file_name="test_file.txt")
assert folder.status == FileSystemItemStatus.CORRUPTED
assert file.status == FileSystemItemStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
folder.repair()
file = folder.get_file(file_name="test_file.txt")
assert folder.status == FileSystemItemStatus.GOOD
assert file.status == FileSystemItemStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_file_scan(file_system):
@@ -178,43 +178,63 @@ def test_file_scan(file_system):
folder: Folder = file_system.create_folder(folder_name="test_folder")
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
assert file.status == FileSystemItemStatus.GOOD
assert file.visible_status == FileSystemItemStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.corrupt()
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
file.scan()
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
def test_folder_scan(file_system):
"""Test the ability to update visible status."""
folder: Folder = file_system.create_folder(folder_name="test_folder")
file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file.txt", folder_name="test_folder")
file_system.create_file(file_name="test_file2.txt", folder_name="test_folder")
assert folder.status == FileSystemItemStatus.GOOD
assert folder.visible_status == FileSystemItemStatus.GOOD
assert file.status == FileSystemItemStatus.GOOD
assert file.visible_status == FileSystemItemStatus.GOOD
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
assert folder.health_status == FileSystemItemHealthStatus.GOOD
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.corrupt()
assert folder.status == FileSystemItemStatus.CORRUPTED
assert folder.visible_status == FileSystemItemStatus.GOOD
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.scan()
assert folder.status == FileSystemItemStatus.CORRUPTED
assert folder.visible_status == FileSystemItemStatus.CORRUPTED
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.CORRUPTED
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
folder.apply_timestep(timestep=2)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
def test_simulated_file_check_hash(file_system):
@@ -225,7 +245,7 @@ def test_simulated_file_check_hash(file_system):
# change simulated file size
file.sim_size = 0
assert file.check_hash() is False
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_real_file_check_hash(file_system):
@@ -238,7 +258,7 @@ def test_real_file_check_hash(file_system):
f.write("get hacked scrub lol xD\n")
assert file.check_hash() is False
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_simulated_folder_check_hash(file_system):
@@ -251,7 +271,7 @@ def test_simulated_folder_check_hash(file_system):
file = folder.get_file(file_name="test_file.txt")
file.sim_size = 0
assert folder.check_hash() is False
assert folder.status == FileSystemItemStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_real_folder_check_hash(file_system):
@@ -268,7 +288,7 @@ def test_real_folder_check_hash(file_system):
f.write("get hacked scrub lol xD\n")
assert folder.check_hash() is False
assert folder.status == FileSystemItemStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
@pytest.mark.skip(reason="Skipping until we tackle serialisation")

View File

@@ -2,7 +2,7 @@ from typing import Tuple
import pytest
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemStatus, Folder
from primaite.simulator.file_system.file_system import File, FileSystem, FileSystemItemHealthStatus, Folder
@pytest.fixture(scope="function")
@@ -19,31 +19,51 @@ def test_file_scan_request(populated_file_system):
fs, folder, file = populated_file_system
file.corrupt()
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
assert file.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["file", file.uuid, "scan"])
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
assert file.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
def test_folder_scan_request(populated_file_system):
"""Test that an agent can request a folder scan."""
fs, folder, file = populated_file_system
fs.create_file(file_name="test_file2.txt", folder_name="test_folder")
file1: File = folder.get_file_by_id(file_uuid=list(folder.files)[1])
file2: File = folder.get_file_by_id(file_uuid=list(folder.files)[0])
folder.corrupt()
assert folder.status == FileSystemItemStatus.CORRUPTED
assert file.status == FileSystemItemStatus.CORRUPTED
assert folder.visible_status == FileSystemItemStatus.GOOD
assert file.visible_status == FileSystemItemStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file1.visible_health_status == FileSystemItemHealthStatus.GOOD
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
fs.apply_request(request=["folder", folder.uuid, "scan"])
assert folder.status == FileSystemItemStatus.CORRUPTED
assert file.status == FileSystemItemStatus.CORRUPTED
assert folder.visible_status == FileSystemItemStatus.CORRUPTED
assert file.visible_status == FileSystemItemStatus.CORRUPTED
folder.apply_timestep(timestep=0)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file2.visible_health_status == FileSystemItemHealthStatus.GOOD
folder.apply_timestep(timestep=1)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
folder.apply_timestep(timestep=2)
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file1.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
assert file2.visible_health_status == FileSystemItemHealthStatus.CORRUPTED
def test_file_checkhash_request(populated_file_system):
@@ -52,12 +72,12 @@ def test_file_checkhash_request(populated_file_system):
fs.apply_request(request=["file", file.uuid, "checkhash"])
assert file.status == FileSystemItemStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["file", file.uuid, "checkhash"])
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_folder_checkhash_request(populated_file_system):
@@ -66,11 +86,11 @@ def test_folder_checkhash_request(populated_file_system):
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
assert folder.status == FileSystemItemStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.GOOD
file.sim_size = 0
fs.apply_request(request=["folder", folder.uuid, "checkhash"])
assert folder.status == FileSystemItemStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_file_repair_request(populated_file_system):
@@ -78,10 +98,10 @@ def test_file_repair_request(populated_file_system):
fs, folder, file = populated_file_system
file.corrupt()
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
fs.apply_request(request=["file", file.uuid, "repair"])
assert file.status == FileSystemItemStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
def test_folder_repair_request(populated_file_system):
@@ -89,12 +109,12 @@ def test_folder_repair_request(populated_file_system):
fs, folder, file = populated_file_system
folder.corrupt()
assert file.status == FileSystemItemStatus.CORRUPTED
assert folder.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
fs.apply_request(request=["folder", folder.uuid, "repair"])
assert file.status == FileSystemItemStatus.GOOD
assert folder.status == FileSystemItemStatus.GOOD
assert file.health_status == FileSystemItemHealthStatus.GOOD
assert folder.health_status == FileSystemItemHealthStatus.GOOD
def test_file_restore_request(populated_file_system):
@@ -109,15 +129,15 @@ def test_file_corrupt_request(populated_file_system):
"""Test that an agent can request a file corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["file", file.uuid, "corrupt"])
assert file.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_folder_corrupt_request(populated_file_system):
"""Test that an agent can request a folder corruption."""
fs, folder, file = populated_file_system
fs.apply_request(request=["folder", folder.uuid, "corrupt"])
assert file.status == FileSystemItemStatus.CORRUPTED
assert folder.status == FileSystemItemStatus.CORRUPTED
assert file.health_status == FileSystemItemHealthStatus.CORRUPTED
assert folder.health_status == FileSystemItemHealthStatus.CORRUPTED
def test_file_delete_request(populated_file_system):

View File

@@ -1,15 +1,16 @@
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
def test_service_scan(service):
"""Test that an agent can request a service scan."""
service.start()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.visible_operating_state == ServiceOperatingState.STOPPED
assert service.health_state_visible == SoftwareHealthState.UNUSED
service.apply_request(["scan"])
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.visible_operating_state == ServiceOperatingState.RUNNING
assert service.health_state_visible == SoftwareHealthState.GOOD
def test_service_stop(service):

View File

@@ -1,17 +1,18 @@
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
def test_scan(service):
assert service.operating_state == ServiceOperatingState.STOPPED
assert service.visible_operating_state == ServiceOperatingState.STOPPED
assert service.health_state_visible == SoftwareHealthState.UNUSED
service.start()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.visible_operating_state == ServiceOperatingState.STOPPED
assert service.health_state_visible == SoftwareHealthState.UNUSED
service.scan()
assert service.operating_state == ServiceOperatingState.RUNNING
assert service.visible_operating_state == ServiceOperatingState.RUNNING
assert service.health_state_visible == SoftwareHealthState.GOOD
def test_start_service(service):
@@ -51,6 +52,13 @@ def test_restart(service):
service.restart()
assert service.operating_state == ServiceOperatingState.RESTARTING
timestep = 0
while service.operating_state == ServiceOperatingState.RESTARTING:
service.apply_timestep(timestep)
timestep += 1
assert service.operating_state == ServiceOperatingState.RUNNING
def test_enable_disable(service):
service.disable()