#2701 - Remove ip address option from node application install
This commit is contained in:
@@ -228,7 +228,7 @@ class NodeApplicationInstallAction(AbstractAction):
|
||||
super().__init__(manager=manager)
|
||||
self.shape: Dict[str, int] = {"node_id": num_nodes}
|
||||
|
||||
def form_request(self, node_id: int, application_name: str, ip_address: str) -> List[str]:
|
||||
def form_request(self, node_id: int, application_name: str) -> List[str]:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
node_name = self.manager.get_node_name_by_idx(node_id)
|
||||
if node_name is None:
|
||||
@@ -241,7 +241,6 @@ class NodeApplicationInstallAction(AbstractAction):
|
||||
"application",
|
||||
"install",
|
||||
application_name,
|
||||
ip_address,
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -55,7 +55,6 @@ class TAP001(AbstractScriptedAgent):
|
||||
return "NODE_APPLICATION_INSTALL", {
|
||||
"node_id": self.starting_node_idx,
|
||||
"application_name": "RansomwareScript",
|
||||
"ip_address": self.ip_address,
|
||||
}
|
||||
|
||||
return "NODE_APPLICATION_EXECUTE", {"node_id": self.starting_node_idx, "application_id": 0}
|
||||
|
||||
@@ -6,7 +6,7 @@ import secrets
|
||||
from abc import ABC, abstractmethod
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Type, TypeVar, Union
|
||||
from typing import Any, Dict, Optional, TypeVar, Union
|
||||
|
||||
from prettytable import MARKDOWN, PrettyTable
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -884,6 +884,54 @@ class Node(SimComponent):
|
||||
|
||||
More information in user guide and docstring for SimComponent._init_request_manager.
|
||||
"""
|
||||
|
||||
def _install_application(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
"""
|
||||
Allows agents to install applications to the node.
|
||||
|
||||
:param request: list containing the application name as the only element
|
||||
:type application: str
|
||||
"""
|
||||
application_name = request[0]
|
||||
if self.software_manager.software.get(application_name):
|
||||
self.sys_log.warning(f"Can't install {application_name}. It's already installed.")
|
||||
return RequestResponse.from_bool(False)
|
||||
application_class = Application._application_registry[application_name]
|
||||
self.software_manager.install(application_class)
|
||||
application_instance = self.software_manager.software.get(application_name)
|
||||
self.applications[application_instance.uuid] = application_instance
|
||||
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
|
||||
self._application_request_manager.add_request(
|
||||
application_name, RequestType(func=application_instance._request_manager)
|
||||
)
|
||||
application_instance.install()
|
||||
if application_name in self.software_manager.software:
|
||||
return RequestResponse.from_bool(True)
|
||||
else:
|
||||
return RequestResponse.from_bool(False)
|
||||
|
||||
def _uninstall_application(request: RequestFormat, context: Dict) -> RequestResponse:
|
||||
"""
|
||||
Uninstall and completely remove application from this node.
|
||||
|
||||
This method is useful for allowing agents to take this action.
|
||||
|
||||
:param application: Application object that is currently associated with this node.
|
||||
:type application: Application
|
||||
:return: True if the application is uninstalled successfully, otherwise False.
|
||||
"""
|
||||
application_name = request[0]
|
||||
if application_name not in self.software_manager.software:
|
||||
self.sys_log.warning(f"Can't uninstall {application_name}. It's not installed.")
|
||||
return RequestResponse.from_bool(False)
|
||||
|
||||
application_instance = self.software_manager.software.get(application_name)
|
||||
self.software_manager.uninstall(application_instance.name)
|
||||
if application_instance.name not in self.software_manager.software:
|
||||
return RequestResponse.from_bool(True)
|
||||
else:
|
||||
return RequestResponse.from_bool(False)
|
||||
|
||||
_node_is_on = Node._NodeIsOnValidator(node=self)
|
||||
|
||||
rm = super()._init_request_manager()
|
||||
@@ -940,25 +988,8 @@ class Node(SimComponent):
|
||||
name="application", request_type=RequestType(func=self._application_manager)
|
||||
)
|
||||
|
||||
self._application_manager.add_request(
|
||||
name="install",
|
||||
request_type=RequestType(
|
||||
func=lambda request, context: RequestResponse.from_bool(
|
||||
self.application_install_action(
|
||||
application=self._read_application_type(request[0]), ip_address=request[1]
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
self._application_manager.add_request(
|
||||
name="uninstall",
|
||||
request_type=RequestType(
|
||||
func=lambda request, context: RequestResponse.from_bool(
|
||||
self.application_uninstall_action(application=self._read_application_type(request[0]))
|
||||
)
|
||||
),
|
||||
)
|
||||
self._application_manager.add_request(name="install", request_type=RequestType(func=_install_application))
|
||||
self._application_manager.add_request(name="uninstall", request_type=RequestType(func=_uninstall_application))
|
||||
|
||||
return rm
|
||||
|
||||
@@ -966,29 +997,6 @@ class Node(SimComponent):
|
||||
"""Install System Software - software that is usually provided with the OS."""
|
||||
pass
|
||||
|
||||
def _read_application_type(self, application_class_str: str) -> Type[IOSoftwareClass]:
|
||||
"""Wrapper that converts the string from the request manager into the appropriate class for the application."""
|
||||
if application_class_str == "DoSBot":
|
||||
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
|
||||
|
||||
return DoSBot
|
||||
elif application_class_str == "DataManipulationBot":
|
||||
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
|
||||
DataManipulationBot,
|
||||
)
|
||||
|
||||
return DataManipulationBot
|
||||
elif application_class_str == "WebBrowser":
|
||||
from primaite.simulator.system.applications.web_browser import WebBrowser
|
||||
|
||||
return WebBrowser
|
||||
elif application_class_str == "RansomwareScript":
|
||||
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
|
||||
|
||||
return RansomwareScript
|
||||
else:
|
||||
return 0
|
||||
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
Produce a dictionary describing the current state of this object.
|
||||
@@ -1417,76 +1425,6 @@ class Node(SimComponent):
|
||||
self.sys_log.info(f"Uninstalled application {application.name}")
|
||||
self._application_request_manager.remove_request(application.name)
|
||||
|
||||
def application_install_action(self, application: Application, ip_address: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Install an application on this node and configure it.
|
||||
|
||||
This method is useful for allowing agents to take this action.
|
||||
|
||||
:param application: Application object that has not been installed on any node yet.
|
||||
:type application: Application
|
||||
:param ip_address: IP address used to configure the application
|
||||
(target IP for the DoSBot or server IP for the DataManipulationBot)
|
||||
:type ip_address: str
|
||||
:return: True if the application is installed successfully, otherwise False.
|
||||
"""
|
||||
if application in self:
|
||||
_LOGGER.warning(
|
||||
f"Can't add application {application.__name__}" + f"to node {self.hostname}. It's already installed."
|
||||
)
|
||||
return True
|
||||
|
||||
self.software_manager.install(application)
|
||||
application_instance = self.software_manager.software.get(str(application.__name__))
|
||||
self.applications[application_instance.uuid] = application_instance
|
||||
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
|
||||
self._application_request_manager.add_request(
|
||||
application_instance.name, RequestType(func=application_instance._request_manager)
|
||||
)
|
||||
|
||||
# Configure application if additional parameters are given
|
||||
if ip_address:
|
||||
if application_instance.name == "DoSBot":
|
||||
application_instance.configure(target_ip_address=IPv4Address(ip_address))
|
||||
elif application_instance.name == "DataManipulationBot":
|
||||
application_instance.configure(server_ip_address=IPv4Address(ip_address))
|
||||
elif application_instance.name == "RansomwareScript":
|
||||
application_instance.configure(server_ip_address=IPv4Address(ip_address))
|
||||
else:
|
||||
pass
|
||||
application_instance.install()
|
||||
if application_instance.name in self.software_manager.software:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def application_uninstall_action(self, application: Application) -> bool:
|
||||
"""
|
||||
Uninstall and completely remove application from this node.
|
||||
|
||||
This method is useful for allowing agents to take this action.
|
||||
|
||||
:param application: Application object that is currently associated with this node.
|
||||
:type application: Application
|
||||
:return: True if the application is uninstalled successfully, otherwise False.
|
||||
"""
|
||||
if application.__name__ not in self.software_manager.software:
|
||||
_LOGGER.warning(
|
||||
f"Can't remove application {application.__name__}" + f"from node {self.hostname}. It's not installed."
|
||||
)
|
||||
return True
|
||||
|
||||
application_instance = self.software_manager.software.get(
|
||||
str(application.__name__)
|
||||
) # This works because we can't have two applications with the same name on the same node
|
||||
# self.uninstall_application(application_instance)
|
||||
self.software_manager.uninstall(application_instance.name)
|
||||
|
||||
if application_instance.name not in self.software_manager.software:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _shut_down_actions(self):
|
||||
"""Actions to perform when the node is shut down."""
|
||||
# Turn off all the services in the node
|
||||
|
||||
Reference in New Issue
Block a user