#2888: Initialise ConfigSchema's and fix type names.

This commit is contained in:
Nick Todd
2024-12-12 14:58:48 +00:00
parent 2ecc142c28
commit 4a52054ed6
25 changed files with 59 additions and 47 deletions

View File

@@ -833,18 +833,18 @@ class UserManager(Service, identifier="UserManager"):
:param disabled_admins: A dictionary of currently disabled admin users by their usernames
"""
config: "UserManager.ConfigSchema"
config: "UserManager.ConfigSchema" = None
users: Dict[str, User] = {}
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for UserManager."""
type: str = "USERMANAGER"
type: str = "USER_MANAGER"
def __init__(self, **kwargs):
"""
Initializes a UserManager instanc.
Initializes a UserManager instance.
:param username: The username for the default admin user
:param password: The password for the default admin user
@@ -1144,7 +1144,7 @@ class UserSessionManager(Service, identifier="UserSessionManager"):
This class handles authentication, session management, and session timeouts for users interacting with the Node.
"""
config: "UserSessionManager.ConfigSchema"
config: "UserSessionManager.ConfigSchema" = None
local_session: Optional[UserSession] = None
"""The current local user session, if any."""
@@ -1170,7 +1170,7 @@ class UserSessionManager(Service, identifier="UserSessionManager"):
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for UserSessionManager."""
type: str = "USERSESSIONMANAGER"
type: str = "USER_SESSION_MANAGER"
def __init__(self, **kwargs):
"""

View File

@@ -30,7 +30,7 @@ class Application(IOSoftware):
Applications are user-facing programs that may perform input/output operations.
"""
config: "Application.ConfigSchema"
config: "Application.ConfigSchema" = None
operating_state: ApplicationOperatingState = ApplicationOperatingState.CLOSED
"The current operating state of the Application."

View File

@@ -70,7 +70,7 @@ class DatabaseClient(Application, identifier="DatabaseClient"):
"""
config: "DatabaseClient.ConfigSchema"
config: "DatabaseClient.ConfigSchema" = None
server_ip_address: Optional[IPv4Address] = None
"""The IPv4 address of the Database Service server, defaults to None."""

View File

@@ -52,7 +52,7 @@ class NMAP(Application, identifier="NMAP"):
as ping scans to discover active hosts and port scans to detect open ports on those hosts.
"""
config: "NMAP.ConfigSchema"
config: "NMAP.ConfigSchema" = None
_active_port_scans: Dict[str, PortScanPayload] = {}
_port_scan_responses: Dict[str, PortScanPayload] = {}

View File

@@ -60,10 +60,10 @@ class AbstractC2(Application, identifier="AbstractC2"):
Defaults to masquerading as HTTP (Port 80) via TCP.
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
Please refer to the Command-and-Control notebook for an in-depth example of the C2 Suite.
"""
config: "AbstractC2"
config: "AbstractC2.ConfigSchema" = None
c2_connection_active: bool = False
"""Indicates if the c2 server and c2 beacon are currently connected."""
@@ -80,7 +80,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for AbstractC2."""
type: str = "ABSTRACTC2"
type: str = "ABSTRACT_C2"
class _C2Opts(BaseModel):
"""A Pydantic Schema for the different C2 configuration options."""

View File

@@ -36,7 +36,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
Please refer to the Command-and-Control notebook for an in-depth example of the C2 Suite.
"""
config: "C2Beacon.ConfigSchema"
config: "C2Beacon.ConfigSchema" = None
keep_alive_attempted: bool = False
"""Indicates if a keep alive has been attempted to be sent this timestep. Used to prevent packet storms."""
@@ -47,7 +47,7 @@ class C2Beacon(AbstractC2, identifier="C2Beacon"):
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for C2Beacon."""
type: str = "C2BEACON"
type: str = "C2_BEACON"
@property
def _host_terminal(self) -> Optional[Terminal]:

View File

@@ -32,10 +32,10 @@ class C2Server(AbstractC2, identifier="C2Server"):
1. Sending commands to the C2 Beacon. (Command input)
2. Parsing terminal RequestResponses back to the Agent.
Please refer to the Command-&-Control notebook for an in-depth example of the C2 Suite.
Please refer to the Command-and-Control notebook for an in-depth example of the C2 Suite.
"""
config: "C2Server.ConfigSchema"
config: "C2Server.ConfigSchema" = None
current_command_output: RequestResponse = None
"""The Request Response by the last command send. This attribute is updated by the method _handle_command_output."""
@@ -43,7 +43,7 @@ class C2Server(AbstractC2, identifier="C2Server"):
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for C2Server."""
type: str = "C2SERVER"
type: str = "C2_SERVER"
def _init_request_manager(self) -> RequestManager:
"""

View File

@@ -33,7 +33,7 @@ class DoSAttackStage(IntEnum):
class DoSBot(DatabaseClient, identifier="DoSBot"):
"""A bot that simulates a Denial of Service attack."""
config: "DoSBot.ConfigSchema"
config: "DoSBot.ConfigSchema" = None
target_ip_address: Optional[IPv4Address] = None
"""IP address of the target service."""
@@ -59,7 +59,7 @@ class DoSBot(DatabaseClient, identifier="DoSBot"):
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for DoSBot."""
type: str = "DOSBOT"
type: str = "DOS_BOT"
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@@ -18,7 +18,7 @@ class RansomwareScript(Application, identifier="RansomwareScript"):
:ivar payload: The attack stage query payload. (Default ENCRYPT)
"""
config: "RansomwareScript.ConfigSchema"
config: "RansomwareScript.ConfigSchema" = None
server_ip_address: Optional[IPv4Address] = None
"""IP address of node which hosts the database."""

View File

@@ -30,7 +30,7 @@ class WebBrowser(Application, identifier="WebBrowser"):
The application requests and loads web pages using its domain name and requesting IP addresses using DNS.
"""
config: "WebBrowser.ConfigSchema"
config: "WebBrowser.ConfigSchema" = None
target_url: Optional[str] = None
@@ -46,7 +46,7 @@ class WebBrowser(Application, identifier="WebBrowser"):
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for WebBrowser."""
type: str = "WEBBROWSER"
type: str = "WEB_BROWSER"
def __init__(self, **kwargs):
kwargs["name"] = "WebBrowser"

View File

@@ -22,7 +22,7 @@ class ARP(Service, identifier="ARP"):
sends ARP requests and replies, and processes incoming ARP packets.
"""
config: "ARP.ConfigSchema"
config: "ARP.ConfigSchema" = None
arp: Dict[IPV4Address, ARPEntry] = {}

View File

@@ -24,7 +24,7 @@ class DatabaseService(Service, identifier="DatabaseService"):
This class inherits from the `Service` class and provides methods to simulate a SQL database.
"""
config: "DatabaseService.ConfigSchema"
config: "DatabaseService.ConfigSchema" = None
password: Optional[str] = None
"""Password that needs to be provided by clients if they want to connect to the DatabaseService."""
@@ -41,7 +41,7 @@ class DatabaseService(Service, identifier="DatabaseService"):
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for DatabaseService."""
type: str = "DATABASESERVICE"
type: str = "DATABASE_SERVICE"
def __init__(self, **kwargs):
kwargs["name"] = "DatabaseService"

View File

@@ -15,11 +15,17 @@ _LOGGER = getLogger(__name__)
class DNSClient(Service):
"""Represents a DNS Client as a Service."""
config: "DNSClient.ConfigSchema" = None
dns_cache: Dict[str, IPv4Address] = {}
"A dict of known mappings between domain/URLs names and IPv4 addresses."
dns_server: Optional[IPv4Address] = None
"The DNS Server the client sends requests to."
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for DNSClient."""
type: str = "DNS_CLIENT"
def __init__(self, **kwargs):
kwargs["name"] = "DNSClient"
kwargs["port"] = PORT_LOOKUP["DNS"]

View File

@@ -16,7 +16,7 @@ _LOGGER = getLogger(__name__)
class DNSServer(Service, identifier="DNSServer"):
"""Represents a DNS Server as a Service."""
config: "DNSServer.ConfigSchema"
config: "DNSServer.ConfigSchema" = None
dns_table: Dict[str, IPv4Address] = {}
"A dict of mappings between domain names and IPv4 addresses."
@@ -24,7 +24,7 @@ class DNSServer(Service, identifier="DNSServer"):
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for DNSServer."""
type: str = "DNSSERVER"
type: str = "DNS_SERVER"
def __init__(self, **kwargs):
kwargs["name"] = "DNSServer"

View File

@@ -24,12 +24,12 @@ class FTPClient(FTPServiceABC, identifier="FTPClient"):
RFC 959: https://datatracker.ietf.org/doc/html/rfc959
"""
config: "FTPClient.ConfigSchema"
config: "FTPClient.ConfigSchema" = None
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for FTPClient."""
type: str = "FTPCLIENT"
type: str = "FTP_CLIENT"
def __init__(self, **kwargs):
kwargs["name"] = "FTPClient"

View File

@@ -19,7 +19,7 @@ class FTPServer(FTPServiceABC, identifier="FTPServer"):
RFC 959: https://datatracker.ietf.org/doc/html/rfc959
"""
config: "FTPServer.ConfigSchema"
config: "FTPServer.ConfigSchema" = None
server_password: Optional[str] = None
"""Password needed to connect to FTP server. Default is None."""
@@ -27,7 +27,7 @@ class FTPServer(FTPServiceABC, identifier="FTPServer"):
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for FTPServer."""
type: str = "FTPServer"
type: str = "FTP_Server"
def __init__(self, **kwargs):
kwargs["name"] = "FTPServer"

View File

@@ -22,7 +22,7 @@ class ICMP(Service, identifier="ICMP"):
network diagnostics, notably the ping command.
"""
config: "ICMP.ConfigSchema"
config: "ICMP.ConfigSchema" = None
request_replies: Dict = {}

View File

@@ -15,7 +15,7 @@ _LOGGER = getLogger(__name__)
class NTPClient(Service, identifier="NTPClient"):
"""Represents a NTP client as a service."""
config: "NTPClient.ConfigSchema"
config: "NTPClient.ConfigSchema" = None
ntp_server: Optional[IPv4Address] = None
"The NTP server the client sends requests to."
@@ -24,7 +24,7 @@ class NTPClient(Service, identifier="NTPClient"):
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for NTPClient."""
type: str = "NTPCLIENT"
type: str = "NTP_CLIENT"
def __init__(self, **kwargs):
kwargs["name"] = "NTPClient"

View File

@@ -14,12 +14,12 @@ _LOGGER = getLogger(__name__)
class NTPServer(Service, identifier="NTPServer"):
"""Represents a NTP server as a service."""
config: "NTPServer.ConfigSchema"
config: "NTPServer.ConfigSchema" = None
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for NTPServer."""
type: str = "NTPSERVER"
type: str = "NTP_SERVER"
def __init__(self, **kwargs):
kwargs["name"] = "NTPServer"

View File

@@ -132,7 +132,7 @@ class RemoteTerminalConnection(TerminalClientConnection):
class Terminal(Service, identifier="Terminal"):
"""Class used to simulate a generic terminal service. Can be interacted with by other terminals via SSH."""
config: "Terminal.ConfigSchema"
config: "Terminal.ConfigSchema" = None
_client_connection_requests: Dict[str, Optional[Union[str, TerminalClientConnection]]] = {}
"""Dictionary of connect requests made to remote nodes."""

View File

@@ -22,14 +22,14 @@ _LOGGER = getLogger(__name__)
class WebServer(Service, identifier="WebServer"):
"""Class used to represent a Web Server Service in simulation."""
config: "WebServer.ConfigSchema"
config: "WebServer.ConfigSchema" = None
response_codes_this_timestep: List[HttpStatusCode] = []
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for WebServer."""
type: str = "WEBSERVER"
type: str = "WEB_SERVER"
def describe_state(self) -> Dict:
"""

View File

@@ -40,12 +40,12 @@ _LOGGER = getLogger(__name__)
class DummyService(Service, identifier="DummyService"):
"""Test Service class"""
config: "DummyService.ConfigSchema"
config: "DummyService.ConfigSchema" = None
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for DummyService."""
type: str = "DUMMYSERVICE"
type: str = "DUMMY_SERVICE"
def describe_state(self) -> Dict:
return super().describe_state()
@@ -63,12 +63,12 @@ class DummyService(Service, identifier="DummyService"):
class DummyApplication(Application, identifier="DummyApplication"):
"""Test Application class"""
config: "DummyApplication.ConfigSchema"
config: "DummyApplication.ConfigSchema" = None
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for DummyApplication."""
type: str = "DUMMYAPPLICATION"
type: str = "DUMMY_APPLICATION"
def __init__(self, **kwargs):
kwargs["name"] = "DummyApplication"

View File

@@ -31,7 +31,7 @@ class ExtendedApplication(Application, identifier="ExtendedApplication"):
The application requests and loads web pages using its domain name and requesting IP addresses using DNS.
"""
config: "ExtendedApplication.ConfigSchema"
config: "ExtendedApplication.ConfigSchema" = None
target_url: Optional[str] = None
@@ -47,7 +47,7 @@ class ExtendedApplication(Application, identifier="ExtendedApplication"):
class ConfigSchema(Application.ConfigSchema):
"""ConfigSchema for ExtendedApplication."""
type: str = "EXTENDEDAPPLICATION"
type: str = "EXTENDED_APPLICATION"
def __init__(self, **kwargs):
kwargs["name"] = "ExtendedApplication"

View File

@@ -17,12 +17,12 @@ from primaite.utils.validation.port import PORT_LOOKUP
class BroadcastTestService(Service, identifier="BroadcastTestService"):
"""A service for sending broadcast and unicast messages over a network."""
config: "BroadcastTestService.ConfigSchema"
config: "BroadcastTestService.ConfigSchema" = None
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for BroadcastTestService."""
type: str = "BROADCASTTESTSERVICE"
type: str = "BROADCAST_TEST_SERVICE"
def __init__(self, **kwargs):
# Set default service properties for broadcasting

View File

@@ -14,13 +14,19 @@ from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT
class _DatabaseListener(Service):
class _DatabaseListener(Service, identifier="_DatabaseListener"):
config: "_DatabaseListener.ConfigSchema" = None
name: str = "DatabaseListener"
protocol: str = PROTOCOL_LOOKUP["TCP"]
port: int = PORT_LOOKUP["NONE"]
listen_on_ports: Set[int] = {PORT_LOOKUP["POSTGRES_SERVER"]}
payloads_received: List[Any] = Field(default_factory=list)
class ConfigSchema(Service.ConfigSchema):
"""ConfigSchema for _DatabaseListener."""
type: str = "_DATABASE_LISTENER"
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:
self.payloads_received.append(payload)
self.sys_log.info(f"{self.name}: received payload {payload}")