#2405 add application install and remove actions

This commit is contained in:
Cristian-VM2
2024-03-27 17:07:12 +00:00
parent de2e5d0b7d
commit 8bb7f8a177
5 changed files with 203 additions and 4 deletions

View File

@@ -219,6 +219,50 @@ class NodeApplicationFixAction(NodeApplicationAbstractAction):
self.verb: str = "fix"
class NodeApplicationInstallAction(AbstractAction):
"""Action which installs an application."""
def __init__(
self, manager: "ActionManager", num_nodes: int, application_name: str, ip_address: str, **kwargs
) -> None:
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes}
self.application_name = application_name
self.ip_address = ip_address
def form_request(self, node_id: int) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
return [
"network",
"node",
node_name,
"software_manager",
"application",
"install",
self.application_name,
self.ip_address,
]
class NodeApplicationRemoveAction(AbstractAction):
"""Action which removes/uninstalls an application."""
def __init__(self, manager: "ActionManager", num_nodes: int, application_name: str, **kwargs) -> None:
super().__init__(manager=manager)
self.shape: Dict[str, int] = {"node_id": num_nodes}
self.application_name = application_name
def form_request(self, node_id: int) -> List[str]:
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
node_name = self.manager.get_node_name_by_idx(node_id)
if node_name is None:
return ["do_nothing"]
return ["network", "node", node_name, "software_manager", "application", "uninstall", self.application_name]
class NodeFolderAbstractAction(AbstractAction):
"""
Base class for folder actions.
@@ -658,6 +702,8 @@ class ActionManager:
"NODE_APPLICATION_SCAN": NodeApplicationScanAction,
"NODE_APPLICATION_CLOSE": NodeApplicationCloseAction,
"NODE_APPLICATION_FIX": NodeApplicationFixAction,
"NODE_APPLICATION_INSTALL": NodeApplicationInstallAction,
"NODE_APPLICATION_REMOVE": NodeApplicationRemoveAction,
"NODE_FILE_SCAN": NodeFileScanAction,
"NODE_FILE_CHECKHASH": NodeFileCheckhashAction,
"NODE_FILE_DELETE": NodeFileDeleteAction,

View File

@@ -5,7 +5,7 @@ import secrets
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv4Network
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Type, TypeVar, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import BaseModel, Field
@@ -35,8 +35,11 @@ from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.processes.process import Process
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.software import IOSoftware
from primaite.utils.validators import IPV4Address
IOSoftwareClass = TypeVar("IOSoftwareClass", bound=IOSoftware)
_LOGGER = getLogger(__name__)
@@ -843,12 +846,56 @@ class Node(SimComponent):
)
rm.add_request("os", RequestType(func=self._os_request_manager, validator=_node_is_on))
self._software_manager = RequestManager()
rm.add_request("software_manager", RequestType(func=self._software_manager, validator=_node_is_on))
self._application_manager = RequestManager()
self._software_manager.add_request(name="application", request_type=RequestType(func=self._application_manager))
self._application_manager.add_request(
name="install",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.application_install_action(
application=self._read_application_type(request[0]), ip_address=request[1]
)
)
),
)
self._application_manager.add_request(
name="uninstall",
request_type=RequestType(
func=lambda request, context: RequestResponse.from_bool(
self.application_uninstall_action(application=self._read_application_type(request[0]))
)
),
)
return rm
def _install_system_software(self):
"""Install System Software - software that is usually provided with the OS."""
pass
def _read_application_type(self, application_class_str: str) -> Type[IOSoftwareClass]:
"""Wrapper that converts the string from the request manager into the appropriate class for the application."""
if application_class_str.lower() == "DoSBot".lower():
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
return DoSBot
elif application_class_str.lower() == "DataManipulationBot".lower():
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
DataManipulationBot,
)
return DataManipulationBot
elif application_class_str.lower() == "WebBrowser".lower():
from primaite.simulator.system.applications.web_browser import WebBrowser
return WebBrowser
else:
return 0
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
@@ -1257,6 +1304,78 @@ class Node(SimComponent):
_LOGGER.info(f"Removed application {application.name} from node {self.hostname}")
self._application_request_manager.remove_request(application.name)
def application_install_action(self, application: Application, ip_address: Optional[str] = None) -> bool:
"""
Install an application on this node and configure it.
This method is useful for allowing agents to take this action.
:param application: Application instance that has not been installed on any node yet.
:type application: Application
:parm
"""
if application in self:
_LOGGER.warning(
f"Can't add application {application.__name__}" + f"to node {self.hostname}. It's already installed."
)
self.software_manager.install(application)
application_instance = self.software_manager.software.get(str(application.__name__))
self.applications[application_instance.uuid] = application_instance
application.parent = self
self.sys_log.info(f"Installed application {application.__name__}")
_LOGGER.debug(f"Added application {application.__name__} to node {self.hostname}")
self._application_request_manager.add_request(
application_instance.name, RequestType(func=application_instance._request_manager)
)
# Configure application if additional parameters are given
if ip_address:
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
DataManipulationBot,
)
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
if application == DoSBot:
application_instance.configure(target_ip_address=IPv4Address(ip_address))
elif application == DataManipulationBot:
application_instance.configure(server_ip_address=IPv4Address(ip_address))
else:
pass
if application in self:
return True
else:
return False
def application_uninstall_action(self, application: Application) -> bool:
"""
Uninstall and completely remove application from this node.
This method is useful for allowing agents to take this action.
:param application: Application object that is currently associated with this node.
:type application: Application
"""
if application.__name__ not in self.software_manager.software:
_LOGGER.warning(
f"Can't remove application {application.__name__}" + f"from node {self.hostname}. It's not installed."
)
return True
application_instance = self.software_manager.software.get(
str(application.__name__)
) # This works because we can't have two applications with the same name on the same node
self.applications.pop(application_instance.uuid)
application.parent = None
self.sys_log.info(f"Uninstalled application {application.__name__}")
_LOGGER.info(f"Removed application {application.__name__} from node {self.hostname}")
self._application_request_manager.remove_request(application_instance.name)
self.software_manager.uninstall(application_instance.name)
if application_instance.name not in self.software_manager.software:
return True
else:
return False
def _shut_down_actions(self):
"""Actions to perform when the node is shut down."""
# Turn off all the services in the node

View File

@@ -88,7 +88,7 @@ class Software(SimComponent):
"The count of times the software has been scanned, defaults to 0."
revealed_to_red: bool = False
"Indicates if the software has been revealed to red agent, defaults is False."
software_manager: "SoftwareManager" = None
software_manager: Optional["SoftwareManager"] = None
"An instance of Software Manager that is used by the parent node."
sys_log: SysLog = None
"An instance of SysLog that is used by the parent node."

View File

@@ -480,6 +480,8 @@ def game_and_agent():
{"type": "NODE_APPLICATION_SCAN"},
{"type": "NODE_APPLICATION_CLOSE"},
{"type": "NODE_APPLICATION_FIX"},
{"type": "NODE_APPLICATION_INSTALL", "options": {"application_name": "DoSBot", "ip_address": "192.168.1.14"}},
{"type": "NODE_APPLICATION_REMOVE", "options": {"application_name": "DoSBot"}},
{"type": "NODE_FILE_SCAN"},
{"type": "NODE_FILE_CHECKHASH"},
{"type": "NODE_FILE_DELETE"},
@@ -507,10 +509,16 @@ def game_and_agent():
nodes=[
{
"node_name": "client_1",
"applications": [{"application_name": "WebBrowser"}],
"applications": [
{"application_name": "WebBrowser"},
{"application_name": "DoSBot"},
],
"folders": [{"folder_name": "downloads", "files": [{"file_name": "cat.png"}]}],
},
{"node_name": "server_1", "services": [{"service_name": "DNSServer"}]},
{
"node_name": "server_1",
"services": [{"service_name": "DNSServer"}],
},
{"node_name": "server_2", "services": [{"service_name": "WebServer"}]},
{"node_name": "router"},
],

View File

@@ -10,6 +10,7 @@
# 4. Check that the simulation has changed in the way that I expect.
# 5. Repeat for all actions.
from ipaddress import IPv4Address
from typing import Tuple
import pytest
@@ -455,3 +456,28 @@ def test_node_application_close_integration(game_and_agent: Tuple[PrimaiteGame,
game.step()
assert browser.operating_state == ApplicationOperatingState.CLOSED
def test_node_application_install_and_uninstall_integration(game_and_agent: Tuple[PrimaiteGame, ProxyAgent]):
"""Test that the NodeApplicationInstallAction and NodeApplicationRemoveAction can form a request and that
it is accepted by the simulation.
When you initiate a install action, the Application will be installed and configured on the node.
The remove action will uninstall the application from the node."""
game, agent = game_and_agent
client_1 = game.simulation.network.get_node_by_hostname("client_1")
assert client_1.software_manager.software.get("DoSBot") is None
action = ("NODE_APPLICATION_INSTALL", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.software_manager.software.get("DoSBot") is not None
action = ("NODE_APPLICATION_REMOVE", {"node_id": 0})
agent.store_action(action)
game.step()
assert client_1.software_manager.software.get("DoSBot") is None