From e1ba0ff1258a20422b105b533883c8d913bee1ac Mon Sep 17 00:00:00 2001 From: Czar Echavez Date: Thu, 6 Jun 2024 01:46:42 +0100 Subject: [PATCH] #2610: apply suggestions from PR --- src/primaite/game/game.py | 5 -- .../red_applications/ransomware_script.py | 86 +++++-------------- .../test_ransomware_script.py | 51 ++--------- 3 files changed, 28 insertions(+), 114 deletions(-) diff --git a/src/primaite/game/game.py b/src/primaite/game/game.py index 772ab5aa..c1ac80e3 100644 --- a/src/primaite/game/game.py +++ b/src/primaite/game/game.py @@ -360,11 +360,6 @@ class PrimaiteGame: server_ip_address=IPv4Address(opt.get("server_ip")), server_password=opt.get("server_password"), payload=opt.get("payload", "ENCRYPT"), - c2_beacon_p_of_success=float(opt.get("c2_beacon_p_of_success", "0.5")), - target_scan_p_of_success=float(opt.get("target_scan_p_of_success", "0.1")), - ransomware_encrypt_p_of_success=float( - opt.get("ransomware_encrypt_p_of_success", "0.1") - ), ) elif application_type == "DatabaseClient": if "options" in application_cfg: diff --git a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py index fb8fbe8a..35aa267f 100644 --- a/src/primaite/simulator/system/applications/red_applications/ransomware_script.py +++ b/src/primaite/simulator/system/applications/red_applications/ransomware_script.py @@ -1,8 +1,6 @@ -from enum import IntEnum from ipaddress import IPv4Address from typing import Dict, Optional -from primaite.game.science import simulate_trial from primaite.interface.request import RequestResponse from primaite.simulator.core import RequestManager, RequestType from primaite.simulator.network.transmission.network_layer import IPProtocol @@ -11,31 +9,10 @@ from primaite.simulator.system.applications.application import Application from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection -class RansomwareAttackStage(IntEnum): - """ - Enumeration representing different attack stages of the ransomware script. - - This enumeration defines the various stages a data manipulation attack can be in during its lifecycle - in the simulation. - Each stage represents a specific phase in the attack process. - """ - - NOT_STARTED = 0 - "Indicates that the attack has not started yet." - SUCCEEDED = 1 - "Indicates the attack has been successfully completed." - FAILED = 2 - "Signifies that the attack has failed." - - class RansomwareScript(Application): """Ransomware Kill Chain - Designed to be used by the TAP001 Agent on the example layout Network. - :ivar payload: The attack stage query payload. (Default Corrupt) - :ivar target_scan_p_of_success: The probability of success for the target scan stage. - :ivar c2_beacon_p_of_success: The probability of success for the c2_beacon stage - :ivar ransomware_encrypt_p_of_success: The probability of success for the ransomware 'attack' (encrypt) stage. - :ivar repeat: Whether to repeat attacking once finished. + :ivar payload: The attack stage query payload. (Default ENCRYPT) """ server_ip_address: Optional[IPv4Address] = None @@ -44,12 +21,6 @@ class RansomwareScript(Application): """Password required to access the database.""" payload: Optional[str] = "ENCRYPT" "Payload String for the payload stage" - ransomware_encrypt_p_of_success: float = 0.9 - "Probability of the ransomware attack succeeding: Default 0.9" - repeat: bool = False - "If true, the Denial of Service bot will keep performing the attack." - attack_stage: RansomwareAttackStage = RansomwareAttackStage.NOT_STARTED - "The ransomware attack stage. See RansomwareAttackStage Class" def __init__(self, **kwargs): kwargs["name"] = "RansomwareScript" @@ -107,14 +78,9 @@ class RansomwareScript(Application): return False if self.server_ip_address and self.payload: self.sys_log.info(f"{self.name}: Running") - self._perform_ransomware_encrypt() - - if self.repeat and self.attack_stage in ( - RansomwareAttackStage.SUCCEEDED, - RansomwareAttackStage.FAILED, - ): - self.attack_stage = RansomwareAttackStage.NOT_STARTED - return True + if self._perform_ransomware_encrypt(): + return True + return False else: self.sys_log.warning(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.") return False @@ -124,8 +90,6 @@ class RansomwareScript(Application): server_ip_address: IPv4Address, server_password: Optional[str] = None, payload: Optional[str] = None, - ransomware_encrypt_p_of_success: Optional[float] = None, - repeat: bool = True, ): """ Configure the Ransomware Script to communicate with a DatabaseService. @@ -142,20 +106,15 @@ class RansomwareScript(Application): self.server_password = server_password if payload: self.payload = payload - if ransomware_encrypt_p_of_success: - self.ransomware_encrypt_p_of_success = ransomware_encrypt_p_of_success - if repeat: - self.repeat = repeat self.sys_log.info( - f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}, " - f"{repeat=}." + f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}." ) def attack(self) -> bool: """Perform the attack steps after opening the application.""" + self.run() if not self._can_perform_action(): self.sys_log.warning("Ransomware application is unable to perform it's actions.") - self.run() self.num_executions += 1 return self._application_loop() @@ -164,7 +123,7 @@ class RansomwareScript(Application): self._db_connection = self._host_db_client.get_new_connection() return True if self._db_connection else False - def _perform_ransomware_encrypt(self): + def _perform_ransomware_encrypt(self) -> bool: """ Execute the Ransomware Encrypt payload on the target. @@ -172,25 +131,22 @@ class RansomwareScript(Application): """ if self._host_db_client is None: self.sys_log.info(f"{self.name}: Failed to connect to db_client - Ransomware Script") - self.attack_stage = RansomwareAttackStage.FAILED - return + return False self._host_db_client.server_ip_address = self.server_ip_address self._host_db_client.server_password = self.server_password - if self.attack_stage == RansomwareAttackStage.NOT_STARTED: - if simulate_trial(self.ransomware_encrypt_p_of_success): - self.sys_log.info(f"{self.name}: Attempting to launch payload") - if not self._db_connection: - self._establish_db_connection() - if self._db_connection: - attack_successful = self._db_connection.query(self.payload) - self.sys_log.info(f"{self.name} Payload delivered: {self.payload}") - if attack_successful: - self.sys_log.info(f"{self.name}: Payload Successful") - self.attack_stage = RansomwareAttackStage.SUCCEEDED - else: - self.sys_log.info(f"{self.name}: Payload failed") - self.attack_stage = RansomwareAttackStage.FAILED + self.sys_log.info(f"{self.name}: Attempting to launch payload") + if not self._db_connection: + self._establish_db_connection() + if self._db_connection: + attack_successful = self._db_connection.query(self.payload) + self.sys_log.info(f"{self.name} Payload delivered: {self.payload}") + if attack_successful: + self.sys_log.info(f"{self.name}: Payload Successful") + return True + else: + self.sys_log.info(f"{self.name}: Payload failed") + return False else: self.sys_log.warning("Attack Attempted to launch too quickly") - self.attack_stage = RansomwareAttackStage.FAILED + return False diff --git a/tests/integration_tests/system/red_applications/test_ransomware_script.py b/tests/integration_tests/system/red_applications/test_ransomware_script.py index 4ae7f92f..427f67ff 100644 --- a/tests/integration_tests/system/red_applications/test_ransomware_script.py +++ b/tests/integration_tests/system/red_applications/test_ransomware_script.py @@ -9,12 +9,8 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer from primaite.simulator.network.hardware.nodes.host.server import Server from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router from primaite.simulator.network.transmission.transport_layer import Port -from primaite.simulator.system.applications.application import ApplicationOperatingState from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection -from primaite.simulator.system.applications.red_applications.ransomware_script import ( - RansomwareAttackStage, - RansomwareScript, -) +from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript from primaite.simulator.system.services.database.database_service import DatabaseService from primaite.simulator.system.software import SoftwareHealthState @@ -85,54 +81,25 @@ def ransomware_script_db_server_green_client(example_network) -> Network: return network -def test_repeating_ransomware_script_attack(ransomware_script_and_db_server): +def test_ransomware_script_attack(ransomware_script_and_db_server): """Test a repeating data manipulation attack.""" RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server + computer.apply_timestep(timestep=0) + server.apply_timestep(timestep=0) assert db_server_service.health_state_actual is SoftwareHealthState.GOOD - assert computer.file_system.num_file_creations == 0 + assert server.file_system.num_file_creations == 1 - RansomwareScript.target_scan_p_of_success = 1 - RansomwareScript.c2_beacon_p_of_success = 1 - RansomwareScript.ransomware_encrypt_p_of_success = 1 - RansomwareScript.repeat = True RansomwareScript.attack() - assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED - assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED - assert computer.file_system.num_file_creations == 1 + assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT + assert server.file_system.num_file_creations == 2 computer.apply_timestep(timestep=1) server.apply_timestep(timestep=1) - assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED - assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED - - -def test_repeating_ransomware_script_attack(ransomware_script_and_db_server): - """Test a repeating ransowmare script attack.""" - RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server - - assert db_server_service.health_state_actual is SoftwareHealthState.GOOD - - RansomwareScript.target_scan_p_of_success = 1 - RansomwareScript.c2_beacon_p_of_success = 1 - RansomwareScript.ransomware_encrypt_p_of_success = 1 - RansomwareScript.repeat = False - RansomwareScript.attack() - - assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT - computer.apply_timestep(timestep=1) - computer.pre_timestep(timestep=1) - server.apply_timestep(timestep=1) - server.pre_timestep(timestep=1) - - assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED - assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT - assert computer.file_system.num_file_creations == 0 - def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_green_client): """Test to see show that the database service still operate""" @@ -152,10 +119,6 @@ def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_ assert green_db_client_connection.query("SELECT") assert green_db_client.last_query_response.get("status_code") == 200 - ransomware_script_application.target_scan_p_of_success = 1 - ransomware_script_application.ransomware_encrypt_p_of_success = 1 - ransomware_script_application.c2_beacon_p_of_success = 1 - ransomware_script_application.repeat = False ransomware_script_application.attack() assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT