#2610: apply suggestions from PR
This commit is contained in:
@@ -360,11 +360,6 @@ class PrimaiteGame:
|
||||
server_ip_address=IPv4Address(opt.get("server_ip")),
|
||||
server_password=opt.get("server_password"),
|
||||
payload=opt.get("payload", "ENCRYPT"),
|
||||
c2_beacon_p_of_success=float(opt.get("c2_beacon_p_of_success", "0.5")),
|
||||
target_scan_p_of_success=float(opt.get("target_scan_p_of_success", "0.1")),
|
||||
ransomware_encrypt_p_of_success=float(
|
||||
opt.get("ransomware_encrypt_p_of_success", "0.1")
|
||||
),
|
||||
)
|
||||
elif application_type == "DatabaseClient":
|
||||
if "options" in application_cfg:
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
from enum import IntEnum
|
||||
from ipaddress import IPv4Address
|
||||
from typing import Dict, Optional
|
||||
|
||||
from primaite.game.science import simulate_trial
|
||||
from primaite.interface.request import RequestResponse
|
||||
from primaite.simulator.core import RequestManager, RequestType
|
||||
from primaite.simulator.network.transmission.network_layer import IPProtocol
|
||||
@@ -11,31 +9,10 @@ from primaite.simulator.system.applications.application import Application
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
|
||||
|
||||
|
||||
class RansomwareAttackStage(IntEnum):
|
||||
"""
|
||||
Enumeration representing different attack stages of the ransomware script.
|
||||
|
||||
This enumeration defines the various stages a data manipulation attack can be in during its lifecycle
|
||||
in the simulation.
|
||||
Each stage represents a specific phase in the attack process.
|
||||
"""
|
||||
|
||||
NOT_STARTED = 0
|
||||
"Indicates that the attack has not started yet."
|
||||
SUCCEEDED = 1
|
||||
"Indicates the attack has been successfully completed."
|
||||
FAILED = 2
|
||||
"Signifies that the attack has failed."
|
||||
|
||||
|
||||
class RansomwareScript(Application):
|
||||
"""Ransomware Kill Chain - Designed to be used by the TAP001 Agent on the example layout Network.
|
||||
|
||||
:ivar payload: The attack stage query payload. (Default Corrupt)
|
||||
:ivar target_scan_p_of_success: The probability of success for the target scan stage.
|
||||
:ivar c2_beacon_p_of_success: The probability of success for the c2_beacon stage
|
||||
:ivar ransomware_encrypt_p_of_success: The probability of success for the ransomware 'attack' (encrypt) stage.
|
||||
:ivar repeat: Whether to repeat attacking once finished.
|
||||
:ivar payload: The attack stage query payload. (Default ENCRYPT)
|
||||
"""
|
||||
|
||||
server_ip_address: Optional[IPv4Address] = None
|
||||
@@ -44,12 +21,6 @@ class RansomwareScript(Application):
|
||||
"""Password required to access the database."""
|
||||
payload: Optional[str] = "ENCRYPT"
|
||||
"Payload String for the payload stage"
|
||||
ransomware_encrypt_p_of_success: float = 0.9
|
||||
"Probability of the ransomware attack succeeding: Default 0.9"
|
||||
repeat: bool = False
|
||||
"If true, the Denial of Service bot will keep performing the attack."
|
||||
attack_stage: RansomwareAttackStage = RansomwareAttackStage.NOT_STARTED
|
||||
"The ransomware attack stage. See RansomwareAttackStage Class"
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs["name"] = "RansomwareScript"
|
||||
@@ -107,14 +78,9 @@ class RansomwareScript(Application):
|
||||
return False
|
||||
if self.server_ip_address and self.payload:
|
||||
self.sys_log.info(f"{self.name}: Running")
|
||||
self._perform_ransomware_encrypt()
|
||||
|
||||
if self.repeat and self.attack_stage in (
|
||||
RansomwareAttackStage.SUCCEEDED,
|
||||
RansomwareAttackStage.FAILED,
|
||||
):
|
||||
self.attack_stage = RansomwareAttackStage.NOT_STARTED
|
||||
return True
|
||||
if self._perform_ransomware_encrypt():
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
self.sys_log.warning(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.")
|
||||
return False
|
||||
@@ -124,8 +90,6 @@ class RansomwareScript(Application):
|
||||
server_ip_address: IPv4Address,
|
||||
server_password: Optional[str] = None,
|
||||
payload: Optional[str] = None,
|
||||
ransomware_encrypt_p_of_success: Optional[float] = None,
|
||||
repeat: bool = True,
|
||||
):
|
||||
"""
|
||||
Configure the Ransomware Script to communicate with a DatabaseService.
|
||||
@@ -142,20 +106,15 @@ class RansomwareScript(Application):
|
||||
self.server_password = server_password
|
||||
if payload:
|
||||
self.payload = payload
|
||||
if ransomware_encrypt_p_of_success:
|
||||
self.ransomware_encrypt_p_of_success = ransomware_encrypt_p_of_success
|
||||
if repeat:
|
||||
self.repeat = repeat
|
||||
self.sys_log.info(
|
||||
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}, "
|
||||
f"{repeat=}."
|
||||
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}."
|
||||
)
|
||||
|
||||
def attack(self) -> bool:
|
||||
"""Perform the attack steps after opening the application."""
|
||||
self.run()
|
||||
if not self._can_perform_action():
|
||||
self.sys_log.warning("Ransomware application is unable to perform it's actions.")
|
||||
self.run()
|
||||
self.num_executions += 1
|
||||
return self._application_loop()
|
||||
|
||||
@@ -164,7 +123,7 @@ class RansomwareScript(Application):
|
||||
self._db_connection = self._host_db_client.get_new_connection()
|
||||
return True if self._db_connection else False
|
||||
|
||||
def _perform_ransomware_encrypt(self):
|
||||
def _perform_ransomware_encrypt(self) -> bool:
|
||||
"""
|
||||
Execute the Ransomware Encrypt payload on the target.
|
||||
|
||||
@@ -172,25 +131,22 @@ class RansomwareScript(Application):
|
||||
"""
|
||||
if self._host_db_client is None:
|
||||
self.sys_log.info(f"{self.name}: Failed to connect to db_client - Ransomware Script")
|
||||
self.attack_stage = RansomwareAttackStage.FAILED
|
||||
return
|
||||
return False
|
||||
|
||||
self._host_db_client.server_ip_address = self.server_ip_address
|
||||
self._host_db_client.server_password = self.server_password
|
||||
if self.attack_stage == RansomwareAttackStage.NOT_STARTED:
|
||||
if simulate_trial(self.ransomware_encrypt_p_of_success):
|
||||
self.sys_log.info(f"{self.name}: Attempting to launch payload")
|
||||
if not self._db_connection:
|
||||
self._establish_db_connection()
|
||||
if self._db_connection:
|
||||
attack_successful = self._db_connection.query(self.payload)
|
||||
self.sys_log.info(f"{self.name} Payload delivered: {self.payload}")
|
||||
if attack_successful:
|
||||
self.sys_log.info(f"{self.name}: Payload Successful")
|
||||
self.attack_stage = RansomwareAttackStage.SUCCEEDED
|
||||
else:
|
||||
self.sys_log.info(f"{self.name}: Payload failed")
|
||||
self.attack_stage = RansomwareAttackStage.FAILED
|
||||
self.sys_log.info(f"{self.name}: Attempting to launch payload")
|
||||
if not self._db_connection:
|
||||
self._establish_db_connection()
|
||||
if self._db_connection:
|
||||
attack_successful = self._db_connection.query(self.payload)
|
||||
self.sys_log.info(f"{self.name} Payload delivered: {self.payload}")
|
||||
if attack_successful:
|
||||
self.sys_log.info(f"{self.name}: Payload Successful")
|
||||
return True
|
||||
else:
|
||||
self.sys_log.info(f"{self.name}: Payload failed")
|
||||
return False
|
||||
else:
|
||||
self.sys_log.warning("Attack Attempted to launch too quickly")
|
||||
self.attack_stage = RansomwareAttackStage.FAILED
|
||||
return False
|
||||
|
||||
@@ -9,12 +9,8 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
||||
from primaite.simulator.network.hardware.nodes.host.server import Server
|
||||
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
|
||||
from primaite.simulator.network.transmission.transport_layer import Port
|
||||
from primaite.simulator.system.applications.application import ApplicationOperatingState
|
||||
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
|
||||
from primaite.simulator.system.applications.red_applications.ransomware_script import (
|
||||
RansomwareAttackStage,
|
||||
RansomwareScript,
|
||||
)
|
||||
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
|
||||
from primaite.simulator.system.services.database.database_service import DatabaseService
|
||||
from primaite.simulator.system.software import SoftwareHealthState
|
||||
|
||||
@@ -85,54 +81,25 @@ def ransomware_script_db_server_green_client(example_network) -> Network:
|
||||
return network
|
||||
|
||||
|
||||
def test_repeating_ransomware_script_attack(ransomware_script_and_db_server):
|
||||
def test_ransomware_script_attack(ransomware_script_and_db_server):
|
||||
"""Test a repeating data manipulation attack."""
|
||||
RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server
|
||||
|
||||
computer.apply_timestep(timestep=0)
|
||||
server.apply_timestep(timestep=0)
|
||||
assert db_server_service.health_state_actual is SoftwareHealthState.GOOD
|
||||
assert computer.file_system.num_file_creations == 0
|
||||
assert server.file_system.num_file_creations == 1
|
||||
|
||||
RansomwareScript.target_scan_p_of_success = 1
|
||||
RansomwareScript.c2_beacon_p_of_success = 1
|
||||
RansomwareScript.ransomware_encrypt_p_of_success = 1
|
||||
RansomwareScript.repeat = True
|
||||
RansomwareScript.attack()
|
||||
|
||||
assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED
|
||||
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED
|
||||
assert computer.file_system.num_file_creations == 1
|
||||
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
|
||||
assert server.file_system.num_file_creations == 2
|
||||
|
||||
computer.apply_timestep(timestep=1)
|
||||
server.apply_timestep(timestep=1)
|
||||
|
||||
assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED
|
||||
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED
|
||||
|
||||
|
||||
def test_repeating_ransomware_script_attack(ransomware_script_and_db_server):
|
||||
"""Test a repeating ransowmare script attack."""
|
||||
RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server
|
||||
|
||||
assert db_server_service.health_state_actual is SoftwareHealthState.GOOD
|
||||
|
||||
RansomwareScript.target_scan_p_of_success = 1
|
||||
RansomwareScript.c2_beacon_p_of_success = 1
|
||||
RansomwareScript.ransomware_encrypt_p_of_success = 1
|
||||
RansomwareScript.repeat = False
|
||||
RansomwareScript.attack()
|
||||
|
||||
assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED
|
||||
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
computer.apply_timestep(timestep=1)
|
||||
computer.pre_timestep(timestep=1)
|
||||
server.apply_timestep(timestep=1)
|
||||
server.pre_timestep(timestep=1)
|
||||
|
||||
assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED
|
||||
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
|
||||
assert computer.file_system.num_file_creations == 0
|
||||
|
||||
|
||||
def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_green_client):
|
||||
"""Test to see show that the database service still operate"""
|
||||
@@ -152,10 +119,6 @@ def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_
|
||||
assert green_db_client_connection.query("SELECT")
|
||||
assert green_db_client.last_query_response.get("status_code") == 200
|
||||
|
||||
ransomware_script_application.target_scan_p_of_success = 1
|
||||
ransomware_script_application.ransomware_encrypt_p_of_success = 1
|
||||
ransomware_script_application.c2_beacon_p_of_success = 1
|
||||
ransomware_script_application.repeat = False
|
||||
ransomware_script_application.attack()
|
||||
|
||||
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
|
||||
|
||||
Reference in New Issue
Block a user