diff --git a/src/primaite/game/game.py b/src/primaite/game/game.py index 772ab5aa..c1ac80e3 100644 --- a/src/primaite/game/game.py +++ b/src/primaite/game/game.py @@ -360,11 +360,6 @@ class PrimaiteGame: server_ip_address=IPv4Address(opt.get("server_ip")), server_password=opt.get("server_password"), payload=opt.get("payload", "ENCRYPT"), - c2_beacon_p_of_success=float(opt.get("c2_beacon_p_of_success", "0.5")), - target_scan_p_of_success=float(opt.get("target_scan_p_of_success", "0.1")), - ransomware_encrypt_p_of_success=float( - opt.get("ransomware_encrypt_p_of_success", "0.1") - ), ) elif application_type == "DatabaseClient": if "options" in application_cfg: diff --git a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py index 8acc07b4..a2cf7ba5 100644 --- a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py +++ b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py @@ -1,8 +1,6 @@ -from enum import IntEnum from ipaddress import IPv4Address from typing import Dict, Optional -from primaite.game.science import simulate_trial from primaite.interface.request import RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.transmission.network_layer import IPProtocol @@ -11,43 +9,10 @@ from primaite.simulator.system.applications.application import Application from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection -class RansomwareAttackStage(IntEnum): - """ - Enumeration representing different attack stages of the ransomware script. - - This enumeration defines the various stages a data manipulation attack can be in during its lifecycle - in the simulation. - Each stage represents a specific phase in the attack process. - """ - - NOT_STARTED = 0 - "Indicates that the attack has not started yet." - DOWNLOAD = 1 - "Installing the Encryption Script - Testing" - INSTALL = 2 - "The stage where logon procedures are simulated." - ACTIVATE = 3 - "Operating Status Changes" - PROPAGATE = 4 - "Represents the stage of performing a horizontal port scan on the target." - COMMAND_AND_CONTROL = 5 - "Represents the stage of setting up a rely C2 Beacon (Not Implemented)" - PAYLOAD = 6 - "Stage of actively attacking the target." - SUCCEEDED = 7 - "Indicates the attack has been successfully completed." - FAILED = 8 - "Signifies that the attack has failed." - - class RansomwareScript(Application): """Ransomware Kill Chain - Designed to be used by the TAP001 Agent on the example layout Network. - :ivar payload: The attack stage query payload. (Default Corrupt) - :ivar target_scan_p_of_success: The probability of success for the target scan stage. - :ivar c2_beacon_p_of_success: The probability of success for the c2_beacon stage - :ivar ransomware_encrypt_p_of_success: The probability of success for the ransomware 'attack' (encrypt) stage. - :ivar repeat: Whether to repeat attacking once finished. + :ivar payload: The attack stage query payload. (Default ENCRYPT) """ server_ip_address: Optional[IPv4Address] = None @@ -56,16 +21,6 @@ class RansomwareScript(Application): """Password required to access the database.""" payload: Optional[str] = "ENCRYPT" "Payload String for the payload stage" - target_scan_p_of_success: float = 0.9 - "Probability of the target scan succeeding: Default 0.9" - c2_beacon_p_of_success: float = 0.9 - "Probability of the c2 beacon setup stage succeeding: Default 0.9" - ransomware_encrypt_p_of_success: float = 0.9 - "Probability of the ransomware attack succeeding: Default 0.9" - repeat: bool = False - "If true, the Denial of Service bot will keep performing the attack." - attack_stage: RansomwareAttackStage = RansomwareAttackStage.NOT_STARTED - "The ransomware attack stage. See RansomwareAttackStage Class" def __init__(self, **kwargs): kwargs["name"] = "RansomwareScript" @@ -90,7 +45,7 @@ class RansomwareScript(Application): @property def _host_db_client(self) -> DatabaseClient: """Return the database client that is installed on the same machine as the Ransomware Script.""" - db_client = self.software_manager.software.get("DatabaseClient") + db_client: DatabaseClient = self.software_manager.software.get("DatabaseClient") if db_client is None: self.sys_log.warning(f"{self.__class__.__name__} cannot find a database client on its host.") return db_client @@ -108,16 +63,6 @@ class RansomwareScript(Application): ) return rm - def _activate(self): - """ - Simulate the install process as the initial stage of the attack. - - Advances the attack stage to 'ACTIVATE' attack state. - """ - if self.attack_stage == RansomwareAttackStage.INSTALL: - self.sys_log.info(f"{self.name}: Activated!") - self.attack_stage = RansomwareAttackStage.ACTIVATE - def run(self) -> bool: """Calls the parent classes execute method before starting the application loop.""" super().run() @@ -133,20 +78,9 @@ class RansomwareScript(Application): return False if self.server_ip_address and self.payload: self.sys_log.info(f"{self.name}: Running") - self.attack_stage = RansomwareAttackStage.NOT_STARTED - self._local_download() - self._install() - self._activate() - self._perform_target_scan() - self._setup_beacon() - self._perform_ransomware_encrypt() - - if self.repeat and self.attack_stage in ( - RansomwareAttackStage.SUCCEEDED, - RansomwareAttackStage.FAILED, - ): - self.attack_stage = RansomwareAttackStage.NOT_STARTED - return True + if self._perform_ransomware_encrypt(): + return True + return False else: self.sys_log.warning(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.") return False @@ -156,10 +90,6 @@ class RansomwareScript(Application): server_ip_address: IPv4Address, server_password: Optional[str] = None, payload: Optional[str] = None, - target_scan_p_of_success: Optional[float] = None, - c2_beacon_p_of_success: Optional[float] = None, - ransomware_encrypt_p_of_success: Optional[float] = None, - repeat: bool = True, ): """ Configure the Ransomware Script to communicate with a DatabaseService. @@ -167,10 +97,6 @@ class RansomwareScript(Application): :param server_ip_address: The IP address of the Node the DatabaseService is on. :param server_password: The password on the DatabaseService. :param payload: The attack stage query (Encrypt / Delete) - :param target_scan_p_of_success: The probability of success for the target scan stage. - :param c2_beacon_p_of_success: The probability of success for the c2_beacon stage - :param ransomware_encrypt_p_of_success: The probability of success for the ransomware 'attack' (encrypt) stage. - :param repeat: Whether to repeat attacking once finished. """ if server_ip_address: self.server_ip_address = server_ip_address @@ -178,74 +104,15 @@ class RansomwareScript(Application): self.server_password = server_password if payload: self.payload = payload - if target_scan_p_of_success: - self.target_scan_p_of_success = target_scan_p_of_success - if c2_beacon_p_of_success: - self.c2_beacon_p_of_success = c2_beacon_p_of_success - if ransomware_encrypt_p_of_success: - self.ransomware_encrypt_p_of_success = ransomware_encrypt_p_of_success - if repeat: - self.repeat = repeat self.sys_log.info( - f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}, " - f"{repeat=}." + f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}." ) - def _install(self): - """ - Simulate the install stage in the kill-chain. - - Advances the attack stage to 'ACTIVATE' if successful. - - From this attack stage onwards. - the ransomware application is now visible from this point onwardin the observation space. - """ - if self.attack_stage == RansomwareAttackStage.DOWNLOAD: - self.sys_log.info(f"{self.name}: Malware installed on the local file system") - downloads_folder = self.file_system.get_folder(folder_name="downloads") - ransomware_file = downloads_folder.get_file(file_name="ransom_script.pdf") - ransomware_file.num_access += 1 - self.attack_stage = RansomwareAttackStage.INSTALL - - def _setup_beacon(self): - """ - Simulates setting up a c2 beacon; currently a pseudo step for increasing red variance. - - Advances the attack stage to 'COMMAND AND CONTROL` if successful. - - :param p_of_sucess: Probability of a successful c2 setup (Advancing this step), - by default the success rate is 0.5 - """ - if self.attack_stage == RansomwareAttackStage.PROPAGATE: - self.sys_log.info(f"{self.name} Attempting to set up C&C Beacon - Scan 1/2") - if simulate_trial(self.c2_beacon_p_of_success): - self.sys_log.info(f"{self.name} C&C Successful setup - Scan 2/2") - c2c_setup = True # TODO Implement the c2c step via an FTP Application/Service - if c2c_setup: - self.attack_stage = RansomwareAttackStage.COMMAND_AND_CONTROL - - def _perform_target_scan(self): - """ - Perform a simulated port scan to check for open SQL ports. - - Advances the attack stage to `PROPAGATE` if successful. - - :param p_of_success: Probability of successful port scan, by default 0.1. - """ - if self.attack_stage == RansomwareAttackStage.ACTIVATE: - # perform a port scan to identify that the SQL port is open on the server - self.sys_log.info(f"{self.name}: Scanning for vulnerable databases - Scan 0/2") - if simulate_trial(self.target_scan_p_of_success): - self.sys_log.info(f"{self.name}: Found a target database! Scan 1/2") - port_is_open = True # TODO Implement a NNME Triggering scan as a seperate Red Application - if port_is_open: - self.attack_stage = RansomwareAttackStage.PROPAGATE - def attack(self) -> bool: """Perform the attack steps after opening the application.""" + self.run() if not self._can_perform_action(): self.sys_log.warning("Ransomware application is unable to perform it's actions.") - self.run() self.num_executions += 1 return self._application_loop() @@ -254,57 +121,30 @@ class RansomwareScript(Application): self._db_connection = self._host_db_client.get_new_connection() return True if self._db_connection else False - def _perform_ransomware_encrypt(self): + def _perform_ransomware_encrypt(self) -> bool: """ Execute the Ransomware Encrypt payload on the target. Advances the attack stage to `COMPLETE` if successful, or 'FAILED' if unsuccessful. - :param p_of_success: Probability of successfully performing ransomware encryption, by default 0.1. """ if self._host_db_client is None: self.sys_log.info(f"{self.name}: Failed to connect to db_client - Ransomware Script") - self.attack_stage = RansomwareAttackStage.FAILED - return + return False self._host_db_client.server_ip_address = self.server_ip_address self._host_db_client.server_password = self.server_password - if self.attack_stage == RansomwareAttackStage.COMMAND_AND_CONTROL: - if simulate_trial(self.ransomware_encrypt_p_of_success): - self.sys_log.info(f"{self.name}: Attempting to launch payload") - if not self._db_connection: - self._establish_db_connection() - if self._db_connection: - attack_successful = self._db_connection.query(self.payload) - self.sys_log.info(f"{self.name} Payload delivered: {self.payload}") - if attack_successful: - self.sys_log.info(f"{self.name}: Payload Successful") - self.attack_stage = RansomwareAttackStage.SUCCEEDED - else: - self.sys_log.info(f"{self.name}: Payload failed") - self.attack_stage = RansomwareAttackStage.FAILED + self.sys_log.info(f"{self.name}: Attempting to launch payload") + if not self._db_connection: + self._establish_db_connection() + if self._db_connection: + attack_successful = self._db_connection.query(self.payload) + self.sys_log.info(f"{self.name} Payload delivered: {self.payload}") + if attack_successful: + self.sys_log.info(f"{self.name}: Payload Successful") + return True + else: + self.sys_log.info(f"{self.name}: Payload failed") + return False else: self.sys_log.warning("Attack Attempted to launch too quickly") - self.attack_stage = RansomwareAttackStage.FAILED - - def _local_download(self): - """Downloads itself via the onto the local file_system.""" - if self.attack_stage == RansomwareAttackStage.NOT_STARTED: - if self._local_download_verify(): - self.attack_stage = RansomwareAttackStage.DOWNLOAD - else: - self.sys_log.info("Malware failed to create a installation location") - self.attack_stage = RansomwareAttackStage.FAILED - else: - self.sys_log.info("Malware failed to download") - self.attack_stage = RansomwareAttackStage.FAILED - - def _local_download_verify(self) -> bool: - """Verifies a download location - Creates one if needed.""" - for folder in self.file_system.folders: - if self.file_system.folders[folder].name == "downloads": - self.file_system.num_file_creations += 1 - return True - - self.file_system.create_folder("downloads") - self.file_system.create_file(folder_name="downloads", file_name="ransom_script.pdf") - return True + return False diff --git a/tests/integration_tests/system/red_applications/test_ransomware_script.py b/tests/integration_tests/system/red_applications/test_ransomware_script.py index 9a04610b..427f67ff 100644 --- a/tests/integration_tests/system/red_applications/test_ransomware_script.py +++ b/tests/integration_tests/system/red_applications/test_ransomware_script.py @@ -9,12 +9,8 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer from primaite.simulator.network.hardware.nodes.host.server import Server from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection -from primaite.simulator.system.applications.red_applications.ransomware_script import ( - RansomwareAttackStage, - RansomwareScript, -) +from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.services.database.database_service import DatabaseService from primaite.simulator.system.software import SoftwareHealthState @@ -85,54 +81,24 @@ def ransomware_script_db_server_green_client(example_network) -> Network: return network -def test_repeating_ransomware_script_attack(ransomware_script_and_db_server): +def test_ransomware_script_attack(ransomware_script_and_db_server): """Test a repeating data manipulation attack.""" RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server + computer.apply_timestep(timestep=0) + server.apply_timestep(timestep=0) assert db_server_service.health_state_actual is SoftwareHealthState.GOOD - assert computer.file_system.num_file_creations == 0 + assert server.file_system.num_file_creations == 1 - RansomwareScript.target_scan_p_of_success = 1 - RansomwareScript.c2_beacon_p_of_success = 1 - RansomwareScript.ransomware_encrypt_p_of_success = 1 - RansomwareScript.repeat = True RansomwareScript.attack() - assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED - assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED - assert computer.file_system.num_file_creations == 1 + assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT + assert server.file_system.num_file_creations == 2 computer.apply_timestep(timestep=1) server.apply_timestep(timestep=1) - assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED - assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED - - -def test_repeating_ransomware_script_attack(ransomware_script_and_db_server): - """Test a repeating ransowmare script attack.""" - RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server - - assert db_server_service.health_state_actual is SoftwareHealthState.GOOD - - RansomwareScript.target_scan_p_of_success = 1 - RansomwareScript.c2_beacon_p_of_success = 1 - RansomwareScript.ransomware_encrypt_p_of_success = 1 - RansomwareScript.repeat = False - RansomwareScript.attack() - - assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT - assert computer.file_system.num_file_creations == 1 - - computer.apply_timestep(timestep=1) - computer.pre_timestep(timestep=1) - server.apply_timestep(timestep=1) - server.pre_timestep(timestep=1) - - assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED - assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT - assert computer.file_system.num_file_creations == 0 def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_green_client): @@ -153,10 +119,6 @@ def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_ assert green_db_client_connection.query("SELECT") assert green_db_client.last_query_response.get("status_code") == 200 - ransomware_script_application.target_scan_p_of_success = 1 - ransomware_script_application.ransomware_encrypt_p_of_success = 1 - ransomware_script_application.c2_beacon_p_of_success = 1 - ransomware_script_application.repeat = False ransomware_script_application.attack() assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT