#2888: Update with ConfigSchema

This commit is contained in:
Nick Todd
2024-12-10 12:27:50 +00:00
parent ed128fc535
commit 7dd25f18f6
9 changed files with 64 additions and 4 deletions

View File

@@ -29,6 +29,7 @@ class Application(IOSoftware):
Applications are user-facing programs that may perform input/output operations.
"""
config: "Application.ConfigSchema"
operating_state: ApplicationOperatingState = ApplicationOperatingState.CLOSED

View File

@@ -68,10 +68,12 @@ class DatabaseClient(Application, identifier="DatabaseClient"):
Extends the Application class to provide functionality for connecting, querying, and disconnecting from a
Database Service. It mainly operates over TCP protocol.
:ivar server_ip_address: The IPv4 address of the Database Service server, defaults to None.
"""
config: "DatabaseClient.ConfigSchema"
server_ip_address: Optional[IPv4Address] = None
"""The IPv4 address of the Database Service server, defaults to None."""
server_password: Optional[str] = None
_query_success_tracker: Dict[str, bool] = {}
"""Keep track of connections that were established or verified during this step. Used for rewards."""
@@ -88,6 +90,11 @@ class DatabaseClient(Application, identifier="DatabaseClient"):
native_connection: Optional[DatabaseClientConnection] = None
"""Native Client Connection for using the client directly (similar to psql in a terminal)."""
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for DatabaseClient."""
type: str = "DATABASE_CLIENT"
def __init__(self, **kwargs):
kwargs["name"] = "DatabaseClient"
kwargs["port"] = PORT_LOOKUP["POSTGRES_SERVER"]

View File

@@ -52,6 +52,8 @@ class NMAP(Application, identifier="NMAP"):
as ping scans to discover active hosts and port scans to detect open ports on those hosts.
"""
config: "NMAP.ConfigSchema"
_active_port_scans: Dict[str, PortScanPayload] = {}
_port_scan_responses: Dict[str, PortScanPayload] = {}
@@ -62,6 +64,11 @@ class NMAP(Application, identifier="NMAP"):
(False, False): "Port",
}
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for NMAP."""
type: str = "NMAP"
def __init__(self, **kwargs):
kwargs["name"] = "NMAP"
kwargs["port"] = PORT_LOOKUP["NONE"]

View File

@@ -45,7 +45,7 @@ class C2Payload(Enum):
"""C2 Input Command payload. Used by the C2 Server to send a command to the c2 beacon."""
OUTPUT = "output_command"
"""C2 Output Command. Used by the C2 Beacon to send the results of a Input command to the c2 server."""
"""C2 Output Command. Used by the C2 Beacon to send the results of an Input command to the c2 server."""
class AbstractC2(Application, identifier="AbstractC2"):
@@ -63,6 +63,8 @@ class AbstractC2(Application, identifier="AbstractC2"):
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
"""
config: "AbstractC2"
c2_connection_active: bool = False
"""Indicates if the c2 server and c2 beacon are currently connected."""
@@ -75,6 +77,11 @@ class AbstractC2(Application, identifier="AbstractC2"):
keep_alive_inactivity: int = 0
"""Indicates how many timesteps since the last time the c2 application received a keep alive."""
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for AbstractC2."""
type: str = "ABSTRACTC2"
class _C2Opts(BaseModel):
"""A Pydantic Schema for the different C2 configuration options."""
@@ -118,7 +125,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
:type c2_command: C2Command.
:param command_options: The relevant C2 Beacon parameters.F
:type command_options: Dict
:return: Returns the construct C2Packet
:return: Returns the constructed C2Packet
:rtype: C2Packet
"""
constructed_packet = C2Packet(

View File

@@ -8,6 +8,7 @@ from pydantic import validate_call
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.protocols.masquerade import C2Packet
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.red_applications.c2 import ExfilOpts, RansomwareOpts, TerminalOpts
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
@@ -32,15 +33,22 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
2. Leveraging the terminal application to execute requests (dependent on the command given)
3. Sending the RequestResponse back to the C2 Server (Command output)
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
Please refer to the Command-and-Control notebook for an in-depth example of the C2 Suite.
"""
config: "C2Beacon.ConfigSchema"
keep_alive_attempted: bool = False
"""Indicates if a keep alive has been attempted to be sent this timestep. Used to prevent packet storms."""
terminal_session: TerminalClientConnection = None
"The currently in use terminal session."
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for C2Beacon."""
type: str = "C2BEACON"
@property
def _host_terminal(self) -> Optional[Terminal]:
"""Return the Terminal that is installed on the same machine as the C2 Beacon."""

View File

@@ -7,6 +7,7 @@ from pydantic import validate_call
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.protocols.masquerade import C2Packet
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.red_applications.c2 import (
CommandOpts,
ExfilOpts,
@@ -34,9 +35,16 @@ class C2Server(AbstractC2, identifier="C2Server"):
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
"""
config: "C2Server.ConfigSchema"
current_command_output: RequestResponse = None
"""The Request Response by the last command send. This attribute is updated by the method _handle_command_output."""
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for C2Server."""
type: str = "C2SERVER"
def _init_request_manager(self) -> RequestManager:
"""
Initialise the request manager.

View File

@@ -7,6 +7,7 @@ from primaite import getLogger
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.utils.validation.port import Port, PORT_LOOKUP
@@ -32,6 +33,8 @@ class DoSAttackStage(IntEnum):
class DoSBot(DatabaseClient, identifier="DoSBot"):
"""A bot that simulates a Denial of Service attack."""
config: "DoSBot.ConfigSchema"
target_ip_address: Optional[IPv4Address] = None
"""IP address of the target service."""
@@ -53,6 +56,11 @@ class DoSBot(DatabaseClient, identifier="DoSBot"):
dos_intensity: float = 1.0
"""How much of the max sessions will be used by the DoS when attacking."""
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for DoSBot."""
type: str = "DOSBOT"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = "DoSBot"

View File

@@ -18,6 +18,8 @@ class RansomwareScript(Application, identifier="RansomwareScript"):
:ivar payload: The attack stage query payload. (Default ENCRYPT)
"""
config: "RansomwareScript.ConfigSchema"
server_ip_address: Optional[IPv4Address] = None
"""IP address of node which hosts the database."""
server_password: Optional[str] = None
@@ -25,6 +27,11 @@ class RansomwareScript(Application, identifier="RansomwareScript"):
payload: Optional[str] = "ENCRYPT"
"Payload String for the payload stage"
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for RansomwareScript."""
type: str = "RANSOMWARE_SCRIPT"
def __init__(self, **kwargs):
kwargs["name"] = "RansomwareScript"
kwargs["port"] = PORT_LOOKUP["NONE"]

View File

@@ -30,6 +30,8 @@ class WebBrowser(Application, identifier="WebBrowser"):
The application requests and loads web pages using its domain name and requesting IP addresses using DNS.
"""
config: "WebBrowser.ConfigSchema"
target_url: Optional[str] = None
domain_name_ip_address: Optional[IPv4Address] = None
@@ -41,6 +43,11 @@ class WebBrowser(Application, identifier="WebBrowser"):
history: List["BrowserHistoryItem"] = []
"""Keep a log of visited websites and information about the visit, such as response code."""
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for WebBrowser."""
type: str = "WEB_BROWSER"
def __init__(self, **kwargs):
kwargs["name"] = "WebBrowser"
kwargs["protocol"] = PROTOCOL_LOOKUP["TCP"]