diff --git a/src/primaite/game/agent/actions.py b/src/primaite/game/agent/actions.py index 1de5276c..60ff19e5 100644 --- a/src/primaite/game/agent/actions.py +++ b/src/primaite/game/agent/actions.py @@ -229,7 +229,7 @@ class NodeApplicationInstallAction(AbstractAction): super().__init__(manager=manager) self.shape: Dict[str, int] = {"node_id": num_nodes} - def form_request(self, node_id: int, application_name: str, ip_address: str) -> List[str]: + def form_request(self, node_id: int, application_name: str) -> List[str]: """Return the action formatted as a request which can be ingested by the PrimAITE simulation.""" node_name = self.manager.get_node_name_by_idx(node_id) if node_name is None: @@ -242,7 +242,6 @@ class NodeApplicationInstallAction(AbstractAction): "application", "install", application_name, - ip_address, ] diff --git a/src/primaite/game/agent/scripted_agents/tap001.py b/src/primaite/game/agent/scripted_agents/tap001.py index b1a378ef..c4f6062a 100644 --- a/src/primaite/game/agent/scripted_agents/tap001.py +++ b/src/primaite/game/agent/scripted_agents/tap001.py @@ -55,7 +55,6 @@ class TAP001(AbstractScriptedAgent): return "NODE_APPLICATION_INSTALL", { "node_id": self.starting_node_idx, "application_name": "RansomwareScript", - "ip_address": self.ip_address, } return "NODE_APPLICATION_EXECUTE", {"node_id": self.starting_node_idx, "application_id": 0} diff --git a/src/primaite/simulator/network/hardware/base.py b/src/primaite/simulator/network/hardware/base.py index 01745215..1982b08f 100644 --- a/src/primaite/simulator/network/hardware/base.py +++ b/src/primaite/simulator/network/hardware/base.py @@ -6,7 +6,7 @@ import secrets from abc import ABC, abstractmethod from ipaddress import IPv4Address, IPv4Network from pathlib import Path -from typing import Any, Dict, Optional, Type, TypeVar, Union +from typing import Any, Dict, Optional, TypeVar, Union from prettytable import MARKDOWN, PrettyTable from pydantic import BaseModel, Field @@ -884,6 +884,54 @@ class Node(SimComponent): More information in user guide and docstring for SimComponent._init_request_manager. """ + + def _install_application(request: RequestFormat, context: Dict) -> RequestResponse: + """ + Allows agents to install applications to the node. + + :param request: list containing the application name as the only element + :type application: str + """ + application_name = request[0] + if self.software_manager.software.get(application_name): + self.sys_log.warning(f"Can't install {application_name}. It's already installed.") + return RequestResponse.from_bool(False) + application_class = Application._application_registry[application_name] + self.software_manager.install(application_class) + application_instance = self.software_manager.software.get(application_name) + self.applications[application_instance.uuid] = application_instance + _LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}") + self._application_request_manager.add_request( + application_name, RequestType(func=application_instance._request_manager) + ) + application_instance.install() + if application_name in self.software_manager.software: + return RequestResponse.from_bool(True) + else: + return RequestResponse.from_bool(False) + + def _uninstall_application(request: RequestFormat, context: Dict) -> RequestResponse: + """ + Uninstall and completely remove application from this node. + + This method is useful for allowing agents to take this action. + + :param application: Application object that is currently associated with this node. + :type application: Application + :return: True if the application is uninstalled successfully, otherwise False. + """ + application_name = request[0] + if application_name not in self.software_manager.software: + self.sys_log.warning(f"Can't uninstall {application_name}. It's not installed.") + return RequestResponse.from_bool(False) + + application_instance = self.software_manager.software.get(application_name) + self.software_manager.uninstall(application_instance.name) + if application_instance.name not in self.software_manager.software: + return RequestResponse.from_bool(True) + else: + return RequestResponse.from_bool(False) + _node_is_on = Node._NodeIsOnValidator(node=self) rm = super()._init_request_manager() @@ -940,25 +988,8 @@ class Node(SimComponent): name="application", request_type=RequestType(func=self._application_manager) ) - self._application_manager.add_request( - name="install", - request_type=RequestType( - func=lambda request, context: RequestResponse.from_bool( - self.application_install_action( - application=self._read_application_type(request[0]), ip_address=request[1] - ) - ) - ), - ) - - self._application_manager.add_request( - name="uninstall", - request_type=RequestType( - func=lambda request, context: RequestResponse.from_bool( - self.application_uninstall_action(application=self._read_application_type(request[0])) - ) - ), - ) + self._application_manager.add_request(name="install", request_type=RequestType(func=_install_application)) + self._application_manager.add_request(name="uninstall", request_type=RequestType(func=_uninstall_application)) return rm @@ -966,29 +997,6 @@ class Node(SimComponent): """Install System Software - software that is usually provided with the OS.""" pass - def _read_application_type(self, application_class_str: str) -> Type[IOSoftwareClass]: - """Wrapper that converts the string from the request manager into the appropriate class for the application.""" - if application_class_str == "DoSBot": - from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot - - return DoSBot - elif application_class_str == "DataManipulationBot": - from primaite.simulator.system.applications.red_applications.data_manipulation_bot import ( - DataManipulationBot, - ) - - return DataManipulationBot - elif application_class_str == "WebBrowser": - from primaite.simulator.system.applications.web_browser import WebBrowser - - return WebBrowser - elif application_class_str == "RansomwareScript": - from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript - - return RansomwareScript - else: - return 0 - def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. @@ -1417,76 +1425,6 @@ class Node(SimComponent): self.sys_log.info(f"Uninstalled application {application.name}") self._application_request_manager.remove_request(application.name) - def application_install_action(self, application: Application, ip_address: Optional[str] = None) -> bool: - """ - Install an application on this node and configure it. - - This method is useful for allowing agents to take this action. - - :param application: Application object that has not been installed on any node yet. - :type application: Application - :param ip_address: IP address used to configure the application - (target IP for the DoSBot or server IP for the DataManipulationBot) - :type ip_address: str - :return: True if the application is installed successfully, otherwise False. - """ - if application in self: - _LOGGER.warning( - f"Can't add application {application.__name__}" + f"to node {self.hostname}. It's already installed." - ) - return True - - self.software_manager.install(application) - application_instance = self.software_manager.software.get(str(application.__name__)) - self.applications[application_instance.uuid] = application_instance - _LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}") - self._application_request_manager.add_request( - application_instance.name, RequestType(func=application_instance._request_manager) - ) - - # Configure application if additional parameters are given - if ip_address: - if application_instance.name == "DoSBot": - application_instance.configure(target_ip_address=IPv4Address(ip_address)) - elif application_instance.name == "DataManipulationBot": - application_instance.configure(server_ip_address=IPv4Address(ip_address)) - elif application_instance.name == "RansomwareScript": - application_instance.configure(server_ip_address=IPv4Address(ip_address)) - else: - pass - application_instance.install() - if application_instance.name in self.software_manager.software: - return True - else: - return False - - def application_uninstall_action(self, application: Application) -> bool: - """ - Uninstall and completely remove application from this node. - - This method is useful for allowing agents to take this action. - - :param application: Application object that is currently associated with this node. - :type application: Application - :return: True if the application is uninstalled successfully, otherwise False. - """ - if application.__name__ not in self.software_manager.software: - _LOGGER.warning( - f"Can't remove application {application.__name__}" + f"from node {self.hostname}. It's not installed." - ) - return True - - application_instance = self.software_manager.software.get( - str(application.__name__) - ) # This works because we can't have two applications with the same name on the same node - # self.uninstall_application(application_instance) - self.software_manager.uninstall(application_instance.name) - - if application_instance.name not in self.software_manager.software: - return True - else: - return False - def _shut_down_actions(self): """Actions to perform when the node is shut down.""" # Turn off all the services in the node diff --git a/tests/assets/configs/test_application_install.yaml b/tests/assets/configs/test_application_install.yaml index 87402f73..a4e898ae 100644 --- a/tests/assets/configs/test_application_install.yaml +++ b/tests/assets/configs/test_application_install.yaml @@ -683,7 +683,6 @@ agents: options: node_id: 0 application_name: DoSBot - ip_address: 192.168.1.14 79: action: NODE_APPLICATION_REMOVE options: diff --git a/tests/integration_tests/game_layer/test_actions.py b/tests/integration_tests/game_layer/test_actions.py index 0dcf125d..a1005f34 100644 --- a/tests/integration_tests/game_layer/test_actions.py +++ b/tests/integration_tests/game_layer/test_actions.py @@ -557,7 +557,7 @@ def test_node_application_install_and_uninstall_integration(game_and_agent: Tupl assert client_1.software_manager.software.get("DoSBot") is None - action = ("NODE_APPLICATION_INSTALL", {"node_id": 0, "application_name": "DoSBot", "ip_address": "192.168.1.14"}) + action = ("NODE_APPLICATION_INSTALL", {"node_id": 0, "application_name": "DoSBot"}) agent.store_action(action) game.step()