Merged PR 404: #2610: removed tap logic from ransomware script

## Summary
Removed irrelevant stages from the ransomware script application

## Test process
no big changes, so existing tests were used. only a minor change in test, which no longer applies

## Checklist
- [X] PR is linked to a **work item**
- [X] **acceptance criteria** of linked ticket are met
- [X] performed **self-review** of the code
- [ ] written **tests** for any new functionality added with this PR
- [ ] updated the **documentation** if this PR changes or adds functionality
- [ ] written/updated **design docs** if this PR implements new functionality
- [ ] updated the **change log**
- [X] ran **pre-commit** checks for code style
- [ ] attended to any **TO-DOs** left in the code

#2610: removed tap logic from ransomware script

Related work items: #2610
This commit is contained in:
Czar Echavez
2024-06-06 11:32:18 +00:00
3 changed files with 29 additions and 232 deletions

View File

@@ -360,11 +360,6 @@ class PrimaiteGame:
server_ip_address=IPv4Address(opt.get("server_ip")),
server_password=opt.get("server_password"),
payload=opt.get("payload", "ENCRYPT"),
c2_beacon_p_of_success=float(opt.get("c2_beacon_p_of_success", "0.5")),
target_scan_p_of_success=float(opt.get("target_scan_p_of_success", "0.1")),
ransomware_encrypt_p_of_success=float(
opt.get("ransomware_encrypt_p_of_success", "0.1")
),
)
elif application_type == "DatabaseClient":
if "options" in application_cfg:

View File

@@ -1,8 +1,6 @@
from enum import IntEnum
from ipaddress import IPv4Address
from typing import Dict, Optional
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.network_layer import IPProtocol
@@ -11,43 +9,10 @@ from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
class RansomwareAttackStage(IntEnum):
"""
Enumeration representing different attack stages of the ransomware script.
This enumeration defines the various stages a data manipulation attack can be in during its lifecycle
in the simulation.
Each stage represents a specific phase in the attack process.
"""
NOT_STARTED = 0
"Indicates that the attack has not started yet."
DOWNLOAD = 1
"Installing the Encryption Script - Testing"
INSTALL = 2
"The stage where logon procedures are simulated."
ACTIVATE = 3
"Operating Status Changes"
PROPAGATE = 4
"Represents the stage of performing a horizontal port scan on the target."
COMMAND_AND_CONTROL = 5
"Represents the stage of setting up a rely C2 Beacon (Not Implemented)"
PAYLOAD = 6
"Stage of actively attacking the target."
SUCCEEDED = 7
"Indicates the attack has been successfully completed."
FAILED = 8
"Signifies that the attack has failed."
class RansomwareScript(Application):
"""Ransomware Kill Chain - Designed to be used by the TAP001 Agent on the example layout Network.
:ivar payload: The attack stage query payload. (Default Corrupt)
:ivar target_scan_p_of_success: The probability of success for the target scan stage.
:ivar c2_beacon_p_of_success: The probability of success for the c2_beacon stage
:ivar ransomware_encrypt_p_of_success: The probability of success for the ransomware 'attack' (encrypt) stage.
:ivar repeat: Whether to repeat attacking once finished.
:ivar payload: The attack stage query payload. (Default ENCRYPT)
"""
server_ip_address: Optional[IPv4Address] = None
@@ -56,16 +21,6 @@ class RansomwareScript(Application):
"""Password required to access the database."""
payload: Optional[str] = "ENCRYPT"
"Payload String for the payload stage"
target_scan_p_of_success: float = 0.9
"Probability of the target scan succeeding: Default 0.9"
c2_beacon_p_of_success: float = 0.9
"Probability of the c2 beacon setup stage succeeding: Default 0.9"
ransomware_encrypt_p_of_success: float = 0.9
"Probability of the ransomware attack succeeding: Default 0.9"
repeat: bool = False
"If true, the Denial of Service bot will keep performing the attack."
attack_stage: RansomwareAttackStage = RansomwareAttackStage.NOT_STARTED
"The ransomware attack stage. See RansomwareAttackStage Class"
def __init__(self, **kwargs):
kwargs["name"] = "RansomwareScript"
@@ -90,7 +45,7 @@ class RansomwareScript(Application):
@property
def _host_db_client(self) -> DatabaseClient:
"""Return the database client that is installed on the same machine as the Ransomware Script."""
db_client = self.software_manager.software.get("DatabaseClient")
db_client: DatabaseClient = self.software_manager.software.get("DatabaseClient")
if db_client is None:
self.sys_log.warning(f"{self.__class__.__name__} cannot find a database client on its host.")
return db_client
@@ -108,16 +63,6 @@ class RansomwareScript(Application):
)
return rm
def _activate(self):
"""
Simulate the install process as the initial stage of the attack.
Advances the attack stage to 'ACTIVATE' attack state.
"""
if self.attack_stage == RansomwareAttackStage.INSTALL:
self.sys_log.info(f"{self.name}: Activated!")
self.attack_stage = RansomwareAttackStage.ACTIVATE
def run(self) -> bool:
"""Calls the parent classes execute method before starting the application loop."""
super().run()
@@ -133,20 +78,9 @@ class RansomwareScript(Application):
return False
if self.server_ip_address and self.payload:
self.sys_log.info(f"{self.name}: Running")
self.attack_stage = RansomwareAttackStage.NOT_STARTED
self._local_download()
self._install()
self._activate()
self._perform_target_scan()
self._setup_beacon()
self._perform_ransomware_encrypt()
if self.repeat and self.attack_stage in (
RansomwareAttackStage.SUCCEEDED,
RansomwareAttackStage.FAILED,
):
self.attack_stage = RansomwareAttackStage.NOT_STARTED
if self._perform_ransomware_encrypt():
return True
return False
else:
self.sys_log.warning(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.")
return False
@@ -156,10 +90,6 @@ class RansomwareScript(Application):
server_ip_address: IPv4Address,
server_password: Optional[str] = None,
payload: Optional[str] = None,
target_scan_p_of_success: Optional[float] = None,
c2_beacon_p_of_success: Optional[float] = None,
ransomware_encrypt_p_of_success: Optional[float] = None,
repeat: bool = True,
):
"""
Configure the Ransomware Script to communicate with a DatabaseService.
@@ -167,10 +97,6 @@ class RansomwareScript(Application):
:param server_ip_address: The IP address of the Node the DatabaseService is on.
:param server_password: The password on the DatabaseService.
:param payload: The attack stage query (Encrypt / Delete)
:param target_scan_p_of_success: The probability of success for the target scan stage.
:param c2_beacon_p_of_success: The probability of success for the c2_beacon stage
:param ransomware_encrypt_p_of_success: The probability of success for the ransomware 'attack' (encrypt) stage.
:param repeat: Whether to repeat attacking once finished.
"""
if server_ip_address:
self.server_ip_address = server_ip_address
@@ -178,74 +104,15 @@ class RansomwareScript(Application):
self.server_password = server_password
if payload:
self.payload = payload
if target_scan_p_of_success:
self.target_scan_p_of_success = target_scan_p_of_success
if c2_beacon_p_of_success:
self.c2_beacon_p_of_success = c2_beacon_p_of_success
if ransomware_encrypt_p_of_success:
self.ransomware_encrypt_p_of_success = ransomware_encrypt_p_of_success
if repeat:
self.repeat = repeat
self.sys_log.info(
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}, "
f"{repeat=}."
f"{self.name}: Configured the {self.name} with {server_ip_address=}, {payload=}, {server_password=}."
)
def _install(self):
"""
Simulate the install stage in the kill-chain.
Advances the attack stage to 'ACTIVATE' if successful.
From this attack stage onwards.
the ransomware application is now visible from this point onwardin the observation space.
"""
if self.attack_stage == RansomwareAttackStage.DOWNLOAD:
self.sys_log.info(f"{self.name}: Malware installed on the local file system")
downloads_folder = self.file_system.get_folder(folder_name="downloads")
ransomware_file = downloads_folder.get_file(file_name="ransom_script.pdf")
ransomware_file.num_access += 1
self.attack_stage = RansomwareAttackStage.INSTALL
def _setup_beacon(self):
"""
Simulates setting up a c2 beacon; currently a pseudo step for increasing red variance.
Advances the attack stage to 'COMMAND AND CONTROL` if successful.
:param p_of_sucess: Probability of a successful c2 setup (Advancing this step),
by default the success rate is 0.5
"""
if self.attack_stage == RansomwareAttackStage.PROPAGATE:
self.sys_log.info(f"{self.name} Attempting to set up C&C Beacon - Scan 1/2")
if simulate_trial(self.c2_beacon_p_of_success):
self.sys_log.info(f"{self.name} C&C Successful setup - Scan 2/2")
c2c_setup = True # TODO Implement the c2c step via an FTP Application/Service
if c2c_setup:
self.attack_stage = RansomwareAttackStage.COMMAND_AND_CONTROL
def _perform_target_scan(self):
"""
Perform a simulated port scan to check for open SQL ports.
Advances the attack stage to `PROPAGATE` if successful.
:param p_of_success: Probability of successful port scan, by default 0.1.
"""
if self.attack_stage == RansomwareAttackStage.ACTIVATE:
# perform a port scan to identify that the SQL port is open on the server
self.sys_log.info(f"{self.name}: Scanning for vulnerable databases - Scan 0/2")
if simulate_trial(self.target_scan_p_of_success):
self.sys_log.info(f"{self.name}: Found a target database! Scan 1/2")
port_is_open = True # TODO Implement a NNME Triggering scan as a seperate Red Application
if port_is_open:
self.attack_stage = RansomwareAttackStage.PROPAGATE
def attack(self) -> bool:
"""Perform the attack steps after opening the application."""
self.run()
if not self._can_perform_action():
self.sys_log.warning("Ransomware application is unable to perform it's actions.")
self.run()
self.num_executions += 1
return self._application_loop()
@@ -254,22 +121,18 @@ class RansomwareScript(Application):
self._db_connection = self._host_db_client.get_new_connection()
return True if self._db_connection else False
def _perform_ransomware_encrypt(self):
def _perform_ransomware_encrypt(self) -> bool:
"""
Execute the Ransomware Encrypt payload on the target.
Advances the attack stage to `COMPLETE` if successful, or 'FAILED' if unsuccessful.
:param p_of_success: Probability of successfully performing ransomware encryption, by default 0.1.
"""
if self._host_db_client is None:
self.sys_log.info(f"{self.name}: Failed to connect to db_client - Ransomware Script")
self.attack_stage = RansomwareAttackStage.FAILED
return
return False
self._host_db_client.server_ip_address = self.server_ip_address
self._host_db_client.server_password = self.server_password
if self.attack_stage == RansomwareAttackStage.COMMAND_AND_CONTROL:
if simulate_trial(self.ransomware_encrypt_p_of_success):
self.sys_log.info(f"{self.name}: Attempting to launch payload")
if not self._db_connection:
self._establish_db_connection()
@@ -278,33 +141,10 @@ class RansomwareScript(Application):
self.sys_log.info(f"{self.name} Payload delivered: {self.payload}")
if attack_successful:
self.sys_log.info(f"{self.name}: Payload Successful")
self.attack_stage = RansomwareAttackStage.SUCCEEDED
return True
else:
self.sys_log.info(f"{self.name}: Payload failed")
self.attack_stage = RansomwareAttackStage.FAILED
return False
else:
self.sys_log.warning("Attack Attempted to launch too quickly")
self.attack_stage = RansomwareAttackStage.FAILED
def _local_download(self):
"""Downloads itself via the onto the local file_system."""
if self.attack_stage == RansomwareAttackStage.NOT_STARTED:
if self._local_download_verify():
self.attack_stage = RansomwareAttackStage.DOWNLOAD
else:
self.sys_log.info("Malware failed to create a installation location")
self.attack_stage = RansomwareAttackStage.FAILED
else:
self.sys_log.info("Malware failed to download")
self.attack_stage = RansomwareAttackStage.FAILED
def _local_download_verify(self) -> bool:
"""Verifies a download location - Creates one if needed."""
for folder in self.file_system.folders:
if self.file_system.folders[folder].name == "downloads":
self.file_system.num_file_creations += 1
return True
self.file_system.create_folder("downloads")
self.file_system.create_file(folder_name="downloads", file_name="ransom_script.pdf")
return True
return False

View File

@@ -9,12 +9,8 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.simulator.system.applications.red_applications.ransomware_script import (
RansomwareAttackStage,
RansomwareScript,
)
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.software import SoftwareHealthState
@@ -85,54 +81,24 @@ def ransomware_script_db_server_green_client(example_network) -> Network:
return network
def test_repeating_ransomware_script_attack(ransomware_script_and_db_server):
def test_ransomware_script_attack(ransomware_script_and_db_server):
"""Test a repeating data manipulation attack."""
RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server
computer.apply_timestep(timestep=0)
server.apply_timestep(timestep=0)
assert db_server_service.health_state_actual is SoftwareHealthState.GOOD
assert computer.file_system.num_file_creations == 0
assert server.file_system.num_file_creations == 1
RansomwareScript.target_scan_p_of_success = 1
RansomwareScript.c2_beacon_p_of_success = 1
RansomwareScript.ransomware_encrypt_p_of_success = 1
RansomwareScript.repeat = True
RansomwareScript.attack()
assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED
assert computer.file_system.num_file_creations == 1
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
assert server.file_system.num_file_creations == 2
computer.apply_timestep(timestep=1)
server.apply_timestep(timestep=1)
assert RansomwareScript.attack_stage == RansomwareAttackStage.NOT_STARTED
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.COMPROMISED
def test_repeating_ransomware_script_attack(ransomware_script_and_db_server):
"""Test a repeating ransowmare script attack."""
RansomwareScript, computer, db_server_service, server = ransomware_script_and_db_server
assert db_server_service.health_state_actual is SoftwareHealthState.GOOD
RansomwareScript.target_scan_p_of_success = 1
RansomwareScript.c2_beacon_p_of_success = 1
RansomwareScript.ransomware_encrypt_p_of_success = 1
RansomwareScript.repeat = False
RansomwareScript.attack()
assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
assert computer.file_system.num_file_creations == 1
computer.apply_timestep(timestep=1)
computer.pre_timestep(timestep=1)
server.apply_timestep(timestep=1)
server.pre_timestep(timestep=1)
assert RansomwareScript.attack_stage == RansomwareAttackStage.SUCCEEDED
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT
assert computer.file_system.num_file_creations == 0
def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_green_client):
@@ -153,10 +119,6 @@ def test_ransomware_disrupts_green_agent_connection(ransomware_script_db_server_
assert green_db_client_connection.query("SELECT")
assert green_db_client.last_query_response.get("status_code") == 200
ransomware_script_application.target_scan_p_of_success = 1
ransomware_script_application.ransomware_encrypt_p_of_success = 1
ransomware_script_application.c2_beacon_p_of_success = 1
ransomware_script_application.repeat = False
ransomware_script_application.attack()
assert db_server_service.db_file.health_status is FileSystemItemHealthStatus.CORRUPT