#2404 add application scan, close, and fix actions, fix and enable service scan test

This commit is contained in:
Cristian-VM2
2024-03-25 16:58:27 +00:00
parent 8b8940f465
commit 600dc3f016
4 changed files with 129 additions and 4 deletions

View File

@@ -195,6 +195,30 @@ class NodeApplicationExecuteAction(NodeApplicationAbstractAction):
self.verb: str = "execute"
class NodeApplicationScanAction(NodeApplicationAbstractAction):
"""Action which scans an application."""
def __init__(self, manager: "ActionManager", num_nodes: int, num_applications: int, **kwargs) -> None:
super().__init__(manager=manager, num_nodes=num_nodes, num_applications=num_applications)
self.verb: str = "scan"
class NodeApplicationCloseAction(NodeApplicationAbstractAction):
"""Action which closes an application."""
def __init__(self, manager: "ActionManager", num_nodes: int, num_applications: int, **kwargs) -> None:
super().__init__(manager=manager, num_nodes=num_nodes, num_applications=num_applications)
self.verb: str = "close"
class NodeApplicationFixAction(NodeApplicationAbstractAction):
"""Action which fixes an application."""
def __init__(self, manager: "ActionManager", num_nodes: int, num_applications: int, **kwargs) -> None:
super().__init__(manager=manager, num_nodes=num_nodes, num_applications=num_applications)
self.verb: str = "patch"
class NodeFolderAbstractAction(AbstractAction):
"""
Base class for folder actions.
@@ -631,6 +655,9 @@ class ActionManager:
"NODE_SERVICE_ENABLE": NodeServiceEnableAction,
"NODE_SERVICE_PATCH": NodeServicePatchAction,
"NODE_APPLICATION_EXECUTE": NodeApplicationExecuteAction,
"NODE_APPLICATION_SCAN": NodeApplicationScanAction,
"NODE_APPLICATION_CLOSE": NodeApplicationCloseAction,
"NODE_APPLICATION_FIX": NodeApplicationFixAction,
"NODE_FILE_SCAN": NodeFileScanAction,
"NODE_FILE_CHECKHASH": NodeFileCheckhashAction,
"NODE_FILE_DELETE": NodeFileDeleteAction,

View File

@@ -3,6 +3,8 @@ from enum import Enum
from typing import Any, Dict, Set
from primaite import getLogger
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
_LOGGER = getLogger(__name__)
@@ -38,6 +40,17 @@ class Application(IOSoftware):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def _init_request_manager(self) -> RequestManager:
"""
Initialise the request manager.
More information in user guide and docstring for SimComponent._init_request_manager.
"""
rm = super()._init_request_manager()
rm.add_request("close", RequestType(func=lambda request, context: RequestResponse.from_bool(self.close())))
return rm
@abstractmethod
def describe_state(self) -> Dict:
"""
@@ -109,6 +122,7 @@ class Application(IOSoftware):
if self.operating_state == ApplicationOperatingState.RUNNING:
self.sys_log.info(f"Closed Application{self.name}")
self.operating_state = ApplicationOperatingState.CLOSED
return True
def install(self) -> None:
"""Install Application."""

View File

@@ -477,6 +477,9 @@ def game_and_agent():
{"type": "NODE_SERVICE_ENABLE"},
{"type": "NODE_SERVICE_PATCH"},
{"type": "NODE_APPLICATION_EXECUTE"},
{"type": "NODE_APPLICATION_SCAN"},
{"type": "NODE_APPLICATION_CLOSE"},
{"type": "NODE_APPLICATION_FIX"},
{"type": "NODE_FILE_SCAN"},
{"type": "NODE_FILE_CHECKHASH"},
{"type": "NODE_FILE_DELETE"},

View File

@@ -17,6 +17,7 @@ import pytest
from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.software import SoftwareHealthState
@@ -30,7 +31,6 @@ def test_do_nothing_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent])
game.step()
@pytest.mark.skip(reason="Waiting to merge ticket 2166")
def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""
Test that the NodeServiceScanAction can form a request and that it is accepted by the simulation.
@@ -42,12 +42,12 @@ def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, Proxy
game, agent = game_and_agent
# 1: Check that the service starts off in a good state, and that visible state is hidden until first scan
svc = game.simulation.network.get_node_by_hostname("client_1").software_manager.software.get("DNSClient")
svc = game.simulation.network.get_node_by_hostname("server_1").software_manager.software.get("DNSServer")
assert svc.health_state_actual == SoftwareHealthState.GOOD
assert svc.health_state_visible == SoftwareHealthState.UNUSED
# 2: Scan and check that the visible state is now correct
action = ("NODE_SERVICE_SCAN", {"node_id": 0, "service_id": 0})
action = ("NODE_SERVICE_SCAN", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert svc.health_state_actual == SoftwareHealthState.GOOD
@@ -58,7 +58,7 @@ def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, Proxy
assert svc.health_state_visible == SoftwareHealthState.GOOD
# 4: Scan and check that the visible state is now correct
action = ("NODE_SERVICE_SCAN", {"node_id": 0, "service_id": 0})
action = ("NODE_SERVICE_SCAN", {"node_id": 1, "service_id": 0})
agent.store_action(action)
game.step()
assert svc.health_state_actual == SoftwareHealthState.COMPROMISED
@@ -374,3 +374,84 @@ def test_network_router_port_enable_integration(game_and_agent: Tuple[PrimaiteGa
# 3: Check that the Port is enabled, and that client 1 can ping again
assert router.network_interface[1].enabled == True
assert client_1.ping("10.0.2.3")
def test_node_application_scan_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the NodeApplicationScanAction updates the application status as expected."""
game, agent = game_and_agent
# 1: Check that http traffic is going across the network nicely.
client_1 = game.simulation.network.get_node_by_hostname("client_1")
browser: WebBrowser = client_1.software_manager.software.get("WebBrowser")
browser.run()
browser.target_url = "http://www.example.com"
assert browser.get_webpage() # check that the browser can access example.com
assert browser.health_state_actual == SoftwareHealthState.GOOD
assert browser.health_state_visible == SoftwareHealthState.UNUSED
# 2: Scan and check that the visible state is now correct
action = ("NODE_APPLICATION_SCAN", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.health_state_actual == SoftwareHealthState.GOOD
assert browser.health_state_visible == SoftwareHealthState.GOOD
# 3: Corrupt the service and check that the visible state is still good
browser.health_state_actual = SoftwareHealthState.COMPROMISED
assert browser.health_state_visible == SoftwareHealthState.GOOD
# 4: Scan and check that the visible state is now correct
action = ("NODE_APPLICATION_SCAN", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.health_state_actual == SoftwareHealthState.COMPROMISED
assert browser.health_state_visible == SoftwareHealthState.COMPROMISED
def test_node_application_fix_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the NodeApplicationFixAction can form a request and that it is accepted by the simulation.
When you initiate a fix action, the software health state turns to PATCHING, then after a few steps, it goes
to GOOD."""
game, agent = game_and_agent
# 1: Check that http traffic is going across the network nicely.
client_1 = game.simulation.network.get_node_by_hostname("client_1")
browser: WebBrowser = client_1.software_manager.software.get("WebBrowser")
browser.health_state_actual = SoftwareHealthState.COMPROMISED
# 2: Apply a fix action
action = ("NODE_APPLICATION_FIX", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
# 3: Check that the application is now in the patching state
assert browser.health_state_actual == SoftwareHealthState.PATCHING
# 4: perform a few do-nothing steps and check that the application is now in the good state
action = ("DONOTHING", {})
agent.store_action(action)
game.step()
assert browser.health_state_actual == SoftwareHealthState.GOOD
def test_node_application_close_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the NodeApplicationCloseAction can form a request and that it is accepted by the simulation.
When you initiate a close action, the Application Operating State changes for CLOSED."""
game, agent = game_and_agent
client_1 = game.simulation.network.get_node_by_hostname("client_1")
browser: WebBrowser = client_1.software_manager.software.get("WebBrowser")
browser.run()
assert browser.operating_state == ApplicationOperatingState.RUNNING
# 2: Apply a close action
action = ("NODE_APPLICATION_CLOSE", {"node_id": 0, "application_id": 0})
agent.store_action(action)
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED