Merge branch 'dev' into feature/2266_network-focussed-dev-documentation

# Conflicts:
#	src/primaite/game/agent/observations/observations.py
This commit is contained in:
Chris McCarthy
2024-04-24 16:04:46 +01:00
42 changed files with 450 additions and 102 deletions

View File

@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## 3.0.0b9
- Removed deprecated `PrimaiteSession` class.
- Added ability to set log levels via configuration.
- Upgraded pydantic to version 2.7.0
- Upgraded Ray to version >= 2.9
- Added ipywidgets to the dependencies
## [Unreleased]
- Made requests fail to reach their target if the node is off

View File

@@ -18,6 +18,8 @@ This section configures how PrimAITE saves data during simulation and training.
save_step_metadata: False
save_pcap_logs: False
save_sys_logs: False
write_sys_log_to_terminal: False
sys_log_level: WARNING
``save_logs``
@@ -25,7 +27,6 @@ This section configures how PrimAITE saves data during simulation and training.
*currently unused*.
``save_agent_actions``
----------------------
@@ -55,3 +56,35 @@ If ``True``, then the pcap files which contain all network traffic during the si
Optional. Default value is ``False``.
If ``True``, then the log files which contain all node actions during the simulation will be saved.
``write_sys_log_to_terminal``
-----------------------------
Optional. Default value is ``False``.
If ``True``, PrimAITE will print sys log to the terminal.
``sys_log_level``
-------------
Optional. Default value is ``WARNING``.
The level of logging that should be visible in the sys logs or the logs output to the terminal.
``save_sys_logs`` or ``write_sys_log_to_terminal`` has to be set to ``True`` for this setting to be used.
Available options are:
- ``DEBUG``: Debug level items and the items below
- ``INFO``: Info level items and the items below
- ``WARNING``: Warning level items and the items below
- ``ERROR``: Error level items and the items below
- ``CRITICAL``: Only critical level logs
See also |logging_levels|
.. |logging_levels| raw:: html
<a href="https://docs.python.org/3/library/logging.html#logging-levels" target="blank">Python logging levels</a>

View File

@@ -38,8 +38,9 @@ dependencies = [
"stable-baselines3[extra]==2.1.0",
"tensorflow==2.12.0",
"typer[all]==0.9.0",
"pydantic==2.1.1",
"ray[rllib] == 2.8.0, < 3"
"pydantic==2.7.0",
"ray[rllib] >= 2.9, < 3",
"ipywidgets"
]
[tool.setuptools.dynamic]

View File

@@ -3,6 +3,7 @@ io_settings:
save_step_metadata: false
save_pcap_logs: false
save_sys_logs: false
sys_log_level: WARNING
game:

View File

@@ -1061,6 +1061,8 @@ agents:
source_port_id: 1
dest_port_id: 1
protocol_id: 1
source_wildcard_id: 0
dest_wildcard_id: 0
47: # old action num: 23 # "ACL: ADDRULE - Block outgoing traffic from client 2"
action: "ROUTER_ACL_ADDRULE"
options:
@@ -1072,6 +1074,8 @@ agents:
source_port_id: 1
dest_port_id: 1
protocol_id: 1
source_wildcard_id: 0
dest_wildcard_id: 0
48: # old action num: 24 # block tcp traffic from client 1 to web app
action: "ROUTER_ACL_ADDRULE"
options:
@@ -1083,6 +1087,8 @@ agents:
source_port_id: 1
dest_port_id: 1
protocol_id: 3
source_wildcard_id: 0
dest_wildcard_id: 0
49: # old action num: 25 # block tcp traffic from client 2 to web app
action: "ROUTER_ACL_ADDRULE"
options:
@@ -1094,6 +1100,8 @@ agents:
source_port_id: 1
dest_port_id: 1
protocol_id: 3
source_wildcard_id: 0
dest_wildcard_id: 0
50: # old action num: 26
action: "ROUTER_ACL_ADDRULE"
options:
@@ -1105,6 +1113,8 @@ agents:
source_port_id: 1
dest_port_id: 1
protocol_id: 3
source_wildcard_id: 0
dest_wildcard_id: 0
51: # old action num: 27
action: "ROUTER_ACL_ADDRULE"
options:
@@ -1116,6 +1126,8 @@ agents:
source_port_id: 1
dest_port_id: 1
protocol_id: 3
source_wildcard_id: 0
dest_wildcard_id: 0
52: # old action num: 28
action: "ROUTER_ACL_REMOVERULE"
options:

View File

@@ -227,6 +227,13 @@ class HostObservation(AbstractObservation, identifier="HOST"):
applications = [ApplicationObservation.from_config(config=c, parent_where=where) for c in config.applications]
folders = [FolderObservation.from_config(config=c, parent_where=where) for c in config.folders]
nics = [NICObservation.from_config(config=c, parent_where=where) for c in config.network_interfaces]
# If list of network interfaces is not defined, assume we want to
# monitor the first N interfaces. Network interface numbering starts at 1.
count = 1
while len(nics) < config.num_nics:
nic_config = NICObservation.ConfigSchema(nic_num=count, include_nmne=config.include_nmne)
nics.append(NICObservation.from_config(config=nic_config, parent_where=where))
count += 1
return cls(
where=where,

View File

@@ -1,6 +1,6 @@
"""Manages the observation space for the agent."""
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Type, Optional, Union
from typing import Any, Dict, Iterable, Optional, Type, Union
from gymnasium import spaces
from gymnasium.core import ObsType

View File

@@ -362,7 +362,7 @@
" cfg = yaml.safe_load(f)\n",
" cfg['simulation']['network']\n",
" for node in cfg['simulation']['network']['nodes']:\n",
" if node['ref'] in ['client_1', 'client_2']:\n",
" if node['hostname'] in ['client_1', 'client_2']:\n",
" node['applications'] = change['applications']\n",
"\n",
"env = PrimaiteGymEnv(game_config = cfg)\n",
@@ -407,7 +407,7 @@
" cfg = yaml.safe_load(f)\n",
" cfg['simulation']['network']\n",
" for node in cfg['simulation']['network']['nodes']:\n",
" if node['ref'] in ['client_1', 'client_2']:\n",
" if node['hostname'] in ['client_1', 'client_2']:\n",
" node['applications'] = change['applications']\n",
"\n",
"env = PrimaiteGymEnv(game_config = cfg)\n",

View File

@@ -26,9 +26,6 @@ class PrimaiteGymEnv(gymnasium.Env):
def __init__(self, game_config: Dict):
"""Initialise the environment."""
super().__init__()
self.io = PrimaiteIO.from_config(game_config.get("io_settings", {}))
"""Handles IO for the environment. This produces sys logs, agent logs, etc."""
self.game_config: Dict = game_config
"""PrimaiteGame definition. This can be changed between episodes to enable curriculum learning."""
self.io = PrimaiteIO.from_config(game_config.get("io_settings", {}))

View File

@@ -6,7 +6,7 @@ from typing import Dict, List, Optional
from pydantic import BaseModel, ConfigDict
from primaite import getLogger, PRIMAITE_PATHS
from primaite.simulator import SIM_OUTPUT
from primaite.simulator import LogLevel, SIM_OUTPUT
_LOGGER = getLogger(__name__)
@@ -35,6 +35,8 @@ class PrimaiteIO:
"""Whether to save system logs."""
write_sys_log_to_terminal: bool = False
"""Whether to write the sys log to the terminal."""
sys_log_level: LogLevel = LogLevel.INFO
"""The level of log that should be included in the logfiles/logged into terminal."""
def __init__(self, settings: Optional[Settings] = None) -> None:
"""
@@ -50,6 +52,7 @@ class PrimaiteIO:
SIM_OUTPUT.save_pcap_logs = self.settings.save_pcap_logs
SIM_OUTPUT.save_sys_logs = self.settings.save_sys_logs
SIM_OUTPUT.write_sys_log_to_terminal = self.settings.write_sys_log_to_terminal
SIM_OUTPUT.sys_log_level = self.settings.sys_log_level
def generate_session_path(self, timestamp: Optional[datetime] = None) -> Path:
"""Create a folder for the session and return the path to it."""
@@ -96,6 +99,10 @@ class PrimaiteIO:
def from_config(cls, config: Dict) -> "PrimaiteIO":
"""Create an instance of PrimaiteIO based on a configuration dict."""
config = config or {}
if config.get("sys_log_level"):
config["sys_log_level"] = LogLevel[config["sys_log_level"].upper()] # convert to enum
new = cls(settings=cls.Settings(**config))
return new

View File

@@ -1,5 +1,6 @@
"""Warning: SIM_OUTPUT is a mutable global variable for the simulation output directory."""
from datetime import datetime
from enum import IntEnum
from pathlib import Path
from primaite import _PRIMAITE_ROOT
@@ -7,6 +8,21 @@ from primaite import _PRIMAITE_ROOT
__all__ = ["SIM_OUTPUT"]
class LogLevel(IntEnum):
"""Enum containing all the available log levels for PrimAITE simulation output."""
DEBUG = 10
"""Debug items will be output to terminal or log file."""
INFO = 20
"""Info items will be output to terminal or log file."""
WARNING = 30
"""Warnings will be output to terminal or log file."""
ERROR = 40
"""Errors will be output to terminal or log file."""
CRITICAL = 50
"""Critical errors will be output to terminal or log file."""
class _SimOutput:
def __init__(self):
self._path: Path = (
@@ -15,6 +31,7 @@ class _SimOutput:
self.save_pcap_logs: bool = False
self.save_sys_logs: bool = False
self.write_sys_log_to_terminal: bool = False
self.sys_log_level: LogLevel = LogLevel.WARNING # default log level is at WARNING
@property
def path(self) -> Path:

View File

@@ -6,7 +6,6 @@ from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.file_system.file import File
@@ -14,8 +13,6 @@ from primaite.simulator.file_system.file_type import FileType
from primaite.simulator.file_system.folder import Folder
from primaite.simulator.system.core.sys_log import SysLog
_LOGGER = getLogger(__name__)
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
@@ -163,11 +160,11 @@ class FileSystem(SimComponent):
:param folder_name: The name of the folder.
"""
if folder_name == "root":
self.sys_log.warning("Cannot delete the root folder.")
self.sys_log.error("Cannot delete the root folder.")
return False
folder = self.get_folder(folder_name)
if not folder:
_LOGGER.debug(f"Cannot delete folder as it does not exist: {folder_name}")
self.sys_log.error(f"Cannot delete folder as it does not exist: {folder_name}")
return False
# set folder to deleted state
@@ -180,7 +177,7 @@ class FileSystem(SimComponent):
folder.remove_all_files()
self.deleted_folders[folder.uuid] = folder
self.sys_log.info(f"Deleted folder /{folder.name} and its contents")
self.sys_log.warning(f"Deleted folder /{folder.name} and its contents")
return True
def delete_folder_by_id(self, folder_uuid: str) -> None:
@@ -283,7 +280,7 @@ class FileSystem(SimComponent):
folder = self.get_folder(folder_name, include_deleted=include_deleted)
if folder:
return folder.get_file(file_name, include_deleted=include_deleted)
self.sys_log.info(f"File not found /{folder_name}/{file_name}")
self.sys_log.warning(f"File not found /{folder_name}/{file_name}")
def get_file_by_id(
self, file_uuid: str, folder_uuid: Optional[str] = None, include_deleted: Optional[bool] = False
@@ -499,7 +496,7 @@ class FileSystem(SimComponent):
"""
folder = self.get_folder(folder_name=folder_name)
if not folder:
_LOGGER.debug(f"Cannot restore file {file_name} in folder {folder_name} as the folder does not exist.")
self.sys_log.error(f"Cannot restore file {file_name} in folder {folder_name} as the folder does not exist.")
return False
file = folder.get_file(file_name=file_name, include_deleted=True)

View File

@@ -4,14 +4,11 @@ from typing import Dict, Optional
from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.file_system.file import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemABC, FileSystemItemHealthStatus
_LOGGER = getLogger(__name__)
class Folder(FileSystemItemABC):
"""Simulation Folder."""
@@ -254,7 +251,7 @@ class Folder(FileSystemItemABC):
file.delete()
self.sys_log.info(f"Removed file {file.name} (id: {file.uuid})")
else:
_LOGGER.debug(f"File with UUID {file.uuid} was not found.")
self.sys_log.error(f"File with UUID {file.uuid} was not found.")
def remove_file_by_id(self, file_uuid: str):
"""

View File

@@ -161,7 +161,7 @@ class WirelessNetworkInterface(NetworkInterface, ABC):
return
if self._connected_node.operating_state != NodeOperatingState.ON:
self._connected_node.sys_log.info(
self._connected_node.sys_log.error(
f"Interface {self} cannot be enabled as the connected Node is not powered on"
)
return

View File

@@ -307,13 +307,13 @@ class WiredNetworkInterface(NetworkInterface, ABC):
return False
if self._connected_node.operating_state != NodeOperatingState.ON:
self._connected_node.sys_log.info(
self._connected_node.sys_log.warning(
f"Interface {self} cannot be enabled as the connected Node is not powered on"
)
return False
if not self._connected_link:
self._connected_node.sys_log.info(f"Interface {self} cannot be enabled as there is no Link connected.")
self._connected_node.sys_log.warning(f"Interface {self} cannot be enabled as there is no Link connected.")
return False
self.enabled = True
@@ -1201,7 +1201,7 @@ class Node(SimComponent):
self._nic_request_manager.add_request(new_nic_num, RequestType(func=network_interface._request_manager))
else:
msg = f"Cannot connect NIC {network_interface} as it is already connected"
self.sys_log.logger.error(msg)
self.sys_log.logger.warning(msg)
raise NetworkError(msg)
def disconnect_nic(self, network_interface: Union[NetworkInterface, str]):
@@ -1228,7 +1228,7 @@ class Node(SimComponent):
self._nic_request_manager.remove_request(network_interface_num)
else:
msg = f"Cannot disconnect Network Interface {network_interface} as it is not connected"
self.sys_log.logger.error(msg)
self.sys_log.logger.warning(msg)
raise NetworkError(msg)
def ping(self, target_ip_address: Union[IPv4Address, str], pings: int = 4) -> bool:
@@ -1360,7 +1360,6 @@ class Node(SimComponent):
self.software_manager.install(application)
application_instance = self.software_manager.software.get(str(application.__name__))
self.applications[application_instance.uuid] = application_instance
self.sys_log.info(f"Installed application {application_instance.name}")
_LOGGER.debug(f"Added application {application_instance.name} to node {self.hostname}")
self._application_request_manager.add_request(
application_instance.name, RequestType(func=application_instance._request_manager)

View File

@@ -147,7 +147,7 @@ class HostARP(ARP):
super()._process_arp_request(arp_packet, from_network_interface)
# Unmatched ARP Request
if arp_packet.target_ip_address != from_network_interface.ip_address:
self.sys_log.info(
self.sys_log.warning(
f"Ignoring ARP request for {arp_packet.target_ip_address}. Current IP address is "
f"{from_network_interface.ip_address}"
)

View File

@@ -933,7 +933,7 @@ class RouterICMP(ICMP):
)
if not network_interface:
self.sys_log.error(
self.sys_log.warning(
"Cannot send ICMP echo reply as there is no outbound Network Interface to use. Try configuring the "
"default gateway."
)
@@ -1482,7 +1482,7 @@ class Router(NetworkNode):
frame.ethernet.dst_mac_addr = target_mac
network_interface.send_frame(frame)
else:
self.sys_log.error(f"Frame dropped as there is no route to {frame.ip.dst_ip_address}")
self.sys_log.warning(f"Frame dropped as there is no route to {frame.ip.dst_ip_address}")
def configure_port(self, port: int, ip_address: Union[IPv4Address, str], subnet_mask: Union[IPv4Address, str]):
"""

View File

@@ -74,7 +74,7 @@ class SwitchPort(WiredNetworkInterface):
if self.enabled:
frame.decrement_ttl()
if frame.ip and frame.ip.ttl < 1:
self._connected_node.sys_log.info("Frame discarded as TTL limit reached")
self._connected_node.sys_log.warning("Frame discarded as TTL limit reached")
return False
self.pcap.capture_inbound(frame)
self._connected_node.receive_frame(frame=frame, from_network_interface=self)

View File

@@ -68,7 +68,7 @@ class WirelessAccessPoint(IPWirelessNetworkInterface):
if self.enabled:
frame.decrement_ttl()
if frame.ip and frame.ip.ttl < 1:
self._connected_node.sys_log.info("Frame discarded as TTL limit reached")
self._connected_node.sys_log.warning("Frame discarded as TTL limit reached")
return False
frame.set_received_timestamp()
self.pcap.capture_inbound(frame)

View File

@@ -2,13 +2,10 @@ from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, Set
from primaite import getLogger
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
_LOGGER = getLogger(__name__)
class ApplicationOperatingState(Enum):
"""Enumeration of Application Operating States."""
@@ -99,7 +96,7 @@ class Application(IOSoftware):
if self.operating_state is not self.operating_state.RUNNING:
# service is not running
_LOGGER.debug(f"Cannot perform action: {self.name} is {self.operating_state.name}")
self.sys_log.error(f"Cannot perform action: {self.name} is {self.operating_state.name}")
return False
return True
@@ -131,7 +128,6 @@ class Application(IOSoftware):
"""Install Application."""
super().install()
if self.operating_state == ApplicationOperatingState.CLOSED:
self.sys_log.info(f"Installing Application {self.name}")
self.operating_state = ApplicationOperatingState.INSTALLING
def receive(self, payload: Any, session_id: str, **kwargs) -> bool:

View File

@@ -2,7 +2,6 @@ from ipaddress import IPv4Address
from typing import Any, Dict, Optional
from uuid import uuid4
from primaite import getLogger
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.network_layer import IPProtocol
@@ -10,8 +9,6 @@ from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.software_manager import SoftwareManager
_LOGGER = getLogger(__name__)
class DatabaseClient(Application):
"""
@@ -136,7 +133,7 @@ class DatabaseClient(Application):
self.server_ip_address = server_ip_address
return True
else:
self.sys_log.info(
self.sys_log.warning(
f"{self.name} {connection_id=}: DatabaseClient connection to {server_ip_address} declined"
)
return False
@@ -156,12 +153,12 @@ class DatabaseClient(Application):
def disconnect(self) -> bool:
"""Disconnect from the Database Service."""
if not self._can_perform_action():
self.sys_log.error(f"Unable to disconnect - {self.name} is {self.operating_state.name}")
self.sys_log.warning(f"Unable to disconnect - {self.name} is {self.operating_state.name}")
return False
# if there are no connections - nothing to disconnect
if not self._server_connection_id:
self.sys_log.error(f"Unable to disconnect - {self.name} has no active connections.")
self.sys_log.warning(f"Unable to disconnect - {self.name} has no active connections.")
return False
# if no connection provided, disconnect the first connection
@@ -196,7 +193,7 @@ class DatabaseClient(Application):
if success:
self.sys_log.info(f"{self.name}: Query successful {sql}")
return True
self.sys_log.info(f"{self.name}: Unable to run query {sql}")
self.sys_log.error(f"{self.name}: Unable to run query {sql}")
return False
else:
software_manager: SoftwareManager = self.software_manager
@@ -236,7 +233,7 @@ class DatabaseClient(Application):
if not connection_id:
msg = "Cannot run sql query, could not establish connection with the server."
self.parent.sys_log(msg)
self.parent.sys_log.warning(msg)
return False
uuid = str(uuid4())
@@ -265,5 +262,5 @@ class DatabaseClient(Application):
status_code = payload.get("status_code")
self._query_success_tracker[query_id] = status_code == 200
if self._query_success_tracker[query_id]:
_LOGGER.debug(f"Received payload {payload}")
self.sys_log.debug(f"Received {payload=}")
return True

View File

@@ -71,7 +71,7 @@ class DataManipulationBot(Application):
"""Return the database client that is installed on the same machine as the DataManipulationBot."""
db_client = self.software_manager.software.get("DatabaseClient")
if db_client is None:
_LOGGER.info(f"{self.__class__.__name__} cannot find a database client on its host.")
self.sys_log.warning(f"{self.__class__.__name__} cannot find a database client on its host.")
return db_client
def _init_request_manager(self) -> RequestManager:
@@ -127,7 +127,7 @@ class DataManipulationBot(Application):
"""
if self.attack_stage == DataManipulationAttackStage.NOT_STARTED:
# Bypass this stage as we're not dealing with logon for now
self.sys_log.info(f"{self.name}: ")
self.sys_log.debug(f"{self.name}: ")
self.attack_stage = DataManipulationAttackStage.LOGON
def _perform_port_scan(self, p_of_success: Optional[float] = 0.1):
@@ -145,7 +145,7 @@ class DataManipulationBot(Application):
# perform the port scan
port_is_open = True # Temporary; later we can implement NMAP port scan.
if port_is_open:
self.sys_log.info(f"{self.name}: ")
self.sys_log.debug(f"{self.name}: ")
self.attack_stage = DataManipulationAttackStage.PORT_SCAN
def _perform_data_manipulation(self, p_of_success: Optional[float] = 0.1):
@@ -177,7 +177,7 @@ class DataManipulationBot(Application):
self.sys_log.info(f"{self.name}: Data manipulation successful")
self.attack_stage = DataManipulationAttackStage.SUCCEEDED
else:
self.sys_log.info(f"{self.name}: Data manipulation failed")
self.sys_log.warning(f"{self.name}: Data manipulation failed")
self.attack_stage = DataManipulationAttackStage.FAILED
def run(self):
@@ -191,7 +191,9 @@ class DataManipulationBot(Application):
def attack(self) -> bool:
"""Perform the attack steps after opening the application."""
if not self._can_perform_action():
_LOGGER.debug("Data manipulation application attempted to execute but it cannot perform actions right now.")
self.sys_log.warning(
"Data manipulation application attempted to execute but it cannot perform actions right now."
)
self.run()
self.num_executions += 1
@@ -206,7 +208,7 @@ class DataManipulationBot(Application):
if not self._can_perform_action():
return False
if self.server_ip_address and self.payload:
self.sys_log.info(f"{self.name}: Running")
self.sys_log.debug(f"{self.name}: Running")
self._logon()
self._perform_port_scan(p_of_success=self.port_scan_p_of_success)
self._perform_data_manipulation(p_of_success=self.data_manipulation_p_of_success)
@@ -220,7 +222,7 @@ class DataManipulationBot(Application):
return True
else:
self.sys_log.error(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.")
self.sys_log.warning(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.")
return False
def apply_timestep(self, timestep: int) -> None:

View File

@@ -122,7 +122,7 @@ class DoSBot(DatabaseClient):
# DoS bot cannot do anything without a target
if not self.target_ip_address or not self.target_port:
self.sys_log.error(
self.sys_log.warning(
f"{self.name} is not properly configured. {self.target_ip_address=}, {self.target_port=}"
)
return True
@@ -152,7 +152,7 @@ class DoSBot(DatabaseClient):
# perform the port scan
port_is_open = True # Temporary; later we can implement NMAP port scan.
if port_is_open:
self.sys_log.info(f"{self.name}: ")
self.sys_log.debug(f"{self.name}: ")
self.attack_stage = DoSAttackStage.PORT_SCAN
def _perform_dos(self):

View File

@@ -2,7 +2,6 @@ from enum import IntEnum
from ipaddress import IPv4Address
from typing import Dict, Optional
from primaite import getLogger
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
@@ -11,8 +10,6 @@ from primaite.simulator.network.transmission.transport_layer import Port
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient
_LOGGER = getLogger(__name__)
class RansomwareAttackStage(IntEnum):
"""
@@ -94,7 +91,7 @@ class RansomwareScript(Application):
"""Return the database client that is installed on the same machine as the Ransomware Script."""
db_client = self.software_manager.software.get("DatabaseClient")
if db_client is None:
_LOGGER.info(f"{self.__class__.__name__} cannot find a database client on its host.")
self.sys_log.warning(f"{self.__class__.__name__} cannot find a database client on its host.")
return db_client
def _init_request_manager(self) -> RequestManager:
@@ -158,7 +155,7 @@ class RansomwareScript(Application):
self.attack_stage = RansomwareAttackStage.NOT_STARTED
return True
else:
self.sys_log.error(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.")
self.sys_log.warning(f"{self.name}: Failed to start as it requires both a target_ip_address and payload.")
return False
def configure(
@@ -254,7 +251,7 @@ class RansomwareScript(Application):
def attack(self) -> bool:
"""Perform the attack steps after opening the application."""
if not self._can_perform_action():
_LOGGER.debug("Ransomware application is unable to perform it's actions.")
self.sys_log.warning("Ransomware application is unable to perform it's actions.")
self.run()
self.num_executions += 1
return self._application_loop()
@@ -289,7 +286,7 @@ class RansomwareScript(Application):
self.sys_log.info(f"{self.name}: Payload failed")
self.attack_stage = RansomwareAttackStage.FAILED
else:
self.sys_log.error("Attack Attempted to launch too quickly")
self.sys_log.warning("Attack Attempted to launch too quickly")
self.attack_stage = RansomwareAttackStage.FAILED
def _local_download(self):

View File

@@ -97,7 +97,7 @@ class WebBrowser(Application):
try:
parsed_url = urlparse(url)
except Exception:
self.sys_log.error(f"{url} is not a valid URL")
self.sys_log.warning(f"{url} is not a valid URL")
return False
# get the IP address of the domain name via DNS
@@ -114,7 +114,7 @@ class WebBrowser(Application):
self.domain_name_ip_address = IPv4Address(parsed_url.hostname)
except Exception:
# unable to deal with this request
self.sys_log.error(f"{self.name}: Unable to resolve URL {url}")
self.sys_log.warning(f"{self.name}: Unable to resolve URL {url}")
return False
# create HTTPRequest payload
@@ -140,7 +140,8 @@ class WebBrowser(Application):
)
return self.latest_response.status_code is HttpStatusCode.OK
else:
self.sys_log.error(f"Error sending Http Packet {str(payload)}")
self.sys_log.warning(f"{self.name}: Error sending Http Packet")
self.sys_log.debug(f"{self.name}: {payload=}")
self.history.append(
WebBrowser.BrowserHistoryItem(
url=url, status=self.BrowserHistoryItem._HistoryItemStatus.SERVER_UNREACHABLE
@@ -181,7 +182,8 @@ class WebBrowser(Application):
:return: True if successful, False otherwise.
"""
if not isinstance(payload, HttpResponsePacket):
self.sys_log.error(f"{self.name} received a packet that is not an HttpResponsePacket")
self.sys_log.warning(f"{self.name} received a packet that is not an HttpResponsePacket")
self.sys_log.debug(f"{self.name}: {payload=}")
return False
self.sys_log.info(f"{self.name}: Received HTTP {payload.status_code.value}")
self.latest_response = payload

View File

@@ -87,7 +87,7 @@ class SoftwareManager:
# TODO: Software manager and node itself both have an install method. Need to refactor to have more logical
# separation of concerns.
if software_class in self._software_class_to_name_map:
self.sys_log.info(f"Cannot install {software_class} as it is already installed")
self.sys_log.warning(f"Cannot install {software_class} as it is already installed")
return
software = software_class(
software_manager=self, sys_log=self.sys_log, file_system=self.file_system, dns_server=self.dns_server
@@ -97,7 +97,6 @@ class SoftwareManager:
software.software_manager = self
self.software[software.name] = software
self.port_protocol_mapping[(software.port, software.protocol)] = software
self.sys_log.info(f"Installed {software.name}")
if isinstance(software, Application):
software.operating_state = ApplicationOperatingState.CLOSED
@@ -144,7 +143,7 @@ class SoftwareManager:
if receiver:
receiver.receive_payload(payload)
else:
self.sys_log.error(f"No Service of Application found with the name {target_software}")
self.sys_log.warning(f"No Service of Application found with the name {target_software}")
def send_payload_to_session_manager(
self,
@@ -196,7 +195,7 @@ class SoftwareManager:
payload=payload, session_id=session_id, from_network_interface=from_network_interface, frame=frame
)
else:
self.sys_log.error(f"No service or application found for port {port} and protocol {protocol}")
self.sys_log.warning(f"No service or application found for port {port} and protocol {protocol}")
pass
def show(self, markdown: bool = False):

View File

@@ -3,7 +3,7 @@ from pathlib import Path
from prettytable import MARKDOWN, PrettyTable
from primaite.simulator import SIM_OUTPUT
from primaite.simulator import LogLevel, SIM_OUTPUT
class _NotJSONFilter(logging.Filter):
@@ -52,6 +52,7 @@ class SysLog:
file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"{self.hostname}_sys_log")
self.logger.handlers.clear() # clear handlers
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
@@ -99,6 +100,9 @@ class SysLog:
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.sys_log_level > LogLevel.DEBUG:
return
if SIM_OUTPUT.save_sys_logs:
self.logger.debug(msg)
self._write_to_terminal(msg, "DEBUG", to_terminal)
@@ -110,6 +114,9 @@ class SysLog:
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.sys_log_level > LogLevel.INFO:
return
if SIM_OUTPUT.save_sys_logs:
self.logger.info(msg)
self._write_to_terminal(msg, "INFO", to_terminal)
@@ -121,6 +128,9 @@ class SysLog:
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.sys_log_level > LogLevel.WARNING:
return
if SIM_OUTPUT.save_sys_logs:
self.logger.warning(msg)
self._write_to_terminal(msg, "WARNING", to_terminal)
@@ -132,6 +142,9 @@ class SysLog:
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if SIM_OUTPUT.sys_log_level > LogLevel.ERROR:
return
if SIM_OUTPUT.save_sys_logs:
self.logger.error(msg)
self._write_to_terminal(msg, "ERROR", to_terminal)
@@ -143,6 +156,9 @@ class SysLog:
:param msg: The message to be logged.
:param to_terminal: If True, prints to the terminal too.
"""
if LogLevel.CRITICAL < SIM_OUTPUT.sys_log_level:
return
if SIM_OUTPUT.save_sys_logs:
self.logger.critical(msg)
self._write_to_terminal(msg, "CRITICAL", to_terminal)

View File

@@ -147,7 +147,7 @@ class ARP(Service):
payload=arp_packet, dst_ip_address=target_ip_address, dst_port=self.port, ip_protocol=self.protocol
)
else:
self.sys_log.error(
self.sys_log.warning(
"Cannot send ARP request as there is no outbound Network Interface to use. Try configuring the default "
"gateway."
)
@@ -173,7 +173,7 @@ class ARP(Service):
ip_protocol=self.protocol,
)
else:
self.sys_log.error(
self.sys_log.warning(
"Cannot send ARP reply as there is no outbound Network Interface to use. Try configuring the default "
"gateway."
)

View File

@@ -57,7 +57,7 @@ class DatabaseService(Service):
# check if the backup server was configured
if self.backup_server_ip is None:
self.sys_log.error(f"{self.name} - {self.sys_log.hostname}: not configured.")
self.sys_log.warning(f"{self.name} - {self.sys_log.hostname}: not configured.")
return False
software_manager: SoftwareManager = self.software_manager
@@ -110,7 +110,7 @@ class DatabaseService(Service):
db_file = self.file_system.get_file(folder_name="database", file_name="database.db", include_deleted=True)
if db_file is None:
self.sys_log.error("Database file not initialised.")
self.sys_log.warning("Database file not initialised.")
return False
# if the file was deleted, get the old visible health state
@@ -170,12 +170,12 @@ class DatabaseService(Service):
# try to create connection
if not self.add_connection(connection_id=connection_id):
status_code = 500
self.sys_log.info(f"{self.name}: Connect request for {connection_id=} declined")
self.sys_log.warning(f"{self.name}: Connect request for {connection_id=} declined")
else:
self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised")
else:
status_code = 401 # Unauthorised
self.sys_log.info(f"{self.name}: Connect request for {connection_id=} declined")
self.sys_log.warning(f"{self.name}: Connect request for {connection_id=} declined")
else:
status_code = 404 # service not found
return {
@@ -206,7 +206,7 @@ class DatabaseService(Service):
self.sys_log.info(f"{self.name}: Running {query}")
if not self.db_file:
self.sys_log.info(f"{self.name}: Failed to run {query} because the database file is missing.")
self.sys_log.error(f"{self.name}: Failed to run {query} because the database file is missing.")
return {"status_code": 404, "type": "sql", "data": False}
if query == "SELECT":
@@ -276,7 +276,7 @@ class DatabaseService(Service):
return {"status_code": 401, "data": False}
else:
# Invalid query
self.sys_log.info(f"{self.name}: Invalid {query}")
self.sys_log.warning(f"{self.name}: Invalid {query}")
return {"status_code": 500, "data": False}
def describe_state(self) -> Dict:

View File

@@ -72,7 +72,7 @@ class DNSClient(Service):
# check if DNS server is configured
if self.dns_server is None:
self.sys_log.error(f"{self.name}: DNS Server is not configured")
self.sys_log.warning(f"{self.name}: DNS Server is not configured")
return False
# check if the target domain is in the client's DNS cache
@@ -88,7 +88,7 @@ class DNSClient(Service):
else:
# return False if already reattempted
if is_reattempt:
self.sys_log.info(f"{self.name}: Domain lookup for {target_domain} failed")
self.sys_log.warning(f"{self.name}: Domain lookup for {target_domain} failed")
return False
else:
# send a request to check if domain name exists in the DNS Server
@@ -143,7 +143,8 @@ class DNSClient(Service):
"""
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
_LOGGER.debug(f"{payload} is not a DNSPacket")
self.sys_log.warning(f"{self.name}: Payload is not a DNSPacket")
self.sys_log.debug(f"{self.name}: {payload}")
return False
if payload.dns_reply is not None:
@@ -156,5 +157,5 @@ class DNSClient(Service):
self.dns_cache[payload.dns_request.domain_name_request] = payload.dns_reply.domain_name_ip_address
return True
self.sys_log.error(f"Failed to resolve domain name {payload.dns_request.domain_name_request}")
self.sys_log.warning(f"Failed to resolve domain name {payload.dns_request.domain_name_request}")
return False

View File

@@ -90,7 +90,8 @@ class DNSServer(Service):
# The payload should be a DNS packet
if not isinstance(payload, DNSPacket):
_LOGGER.debug(f"{payload} is not a DNSPacket")
self.sys_log.warning(f"{payload} is not a DNSPacket")
self.sys_log.debug(f"{payload} is not a DNSPacket")
return False
# cast payload into a DNS packet

View File

@@ -82,7 +82,7 @@ class FTPClient(FTPServiceABC):
else:
if is_reattempt:
# reattempt failed
self.sys_log.info(
self.sys_log.warning(
f"{self.name}: Unable to connect to FTP Server "
f"{dest_ip_address} via port {payload.ftp_command_args.value}"
)
@@ -93,7 +93,7 @@ class FTPClient(FTPServiceABC):
dest_ip_address=dest_ip_address, dest_port=dest_port, session_id=session_id, is_reattempt=True
)
else:
self.sys_log.error(f"{self.name}: Unable to send FTPPacket")
self.sys_log.warning(f"{self.name}: Unable to send FTPPacket")
return False
def _disconnect_from_server(
@@ -158,7 +158,7 @@ class FTPClient(FTPServiceABC):
# check if the file to transfer exists on the client
file_to_transfer: File = self.file_system.get_file(folder_name=src_folder_name, file_name=src_file_name)
if not file_to_transfer:
self.sys_log.error(f"Unable to send file that does not exist: {src_folder_name}/{src_file_name}")
self.sys_log.warning(f"Unable to send file that does not exist: {src_folder_name}/{src_file_name}")
return False
# check if FTP is currently connected to IP
@@ -253,7 +253,8 @@ class FTPClient(FTPServiceABC):
:type: session_id: Optional[str]
"""
if not isinstance(payload, FTPPacket):
self.sys_log.error(f"{payload} is not an FTP packet")
self.sys_log.warning(f"{self.name}: Payload is not an FTP packet")
self.sys_log.debug(f"{self.name}: {payload}")
return False
"""

View File

@@ -70,7 +70,8 @@ class FTPServer(FTPServiceABC):
def receive(self, payload: Any, session_id: Optional[str] = None, **kwargs) -> bool:
"""Receives a payload from the SessionManager."""
if not isinstance(payload, FTPPacket):
self.sys_log.error(f"{payload} is not an FTP packet")
self.sys_log.warning(f"{self.name}: Payload is not an FTP packet")
self.sys_log.debug(f"{self.name}: {payload}")
return False
if not super().receive(payload=payload, session_id=session_id, **kwargs):

View File

@@ -95,7 +95,7 @@ class ICMP(Service):
network_interface = self.software_manager.session_manager.resolve_outbound_network_interface(target_ip_address)
if not network_interface:
self.sys_log.error(
self.sys_log.warning(
"Cannot send ICMP echo request as there is no outbound Network Interface to use. Try configuring the "
"default gateway."
)
@@ -130,7 +130,7 @@ class ICMP(Service):
)
if not network_interface:
self.sys_log.error(
self.sys_log.warning(
"Cannot send ICMP echo reply as there is no outbound Network Interface to use. Try configuring the "
"default gateway."
)

View File

@@ -87,7 +87,7 @@ class NTPClient(Service):
:return: True if successful, False otherwise.
"""
if not isinstance(payload, NTPPacket):
_LOGGER.debug(f"{self.name}: Failed to parse NTP update")
self.sys_log.warning(f"{self.name}: Failed to parse NTP update")
return False
if payload.ntp_reply.ntp_datetime:
self.time = payload.ntp_reply.ntp_datetime
@@ -115,7 +115,6 @@ class NTPClient(Service):
:param timestep: The current timestep number. (Amount of time since simulation episode began)
:type timestep: int
"""
self.sys_log.info(f"{self.name} apply_timestep")
super().apply_timestep(timestep)
if self.operating_state == ServiceOperatingState.RUNNING:
# request time from server

View File

@@ -51,7 +51,8 @@ class NTPServer(Service):
:return: True if valid NTP request else False.
"""
if not (isinstance(payload, NTPPacket)):
_LOGGER.debug(f"{payload} is not a NTPPacket")
self.sys_log.warning(f"{self.name}: Payload is not a NTPPacket")
self.sys_log.debug(f"{self.name}: {payload}")
return False
payload: NTPPacket = payload

View File

@@ -59,7 +59,7 @@ class Service(IOSoftware):
if self.operating_state is not ServiceOperatingState.RUNNING:
# service is not running
_LOGGER.debug(f"Cannot perform action: {self.name} is {self.operating_state.name}")
self.sys_log.debug(f"Cannot perform action: {self.name} is {self.operating_state.name}")
return False
return True
@@ -187,6 +187,6 @@ class Service(IOSoftware):
super().apply_timestep(timestep)
if self.operating_state == ServiceOperatingState.RESTARTING:
if self.restart_countdown <= 0:
_LOGGER.debug(f"Restarting finished for service {self.name}")
self.sys_log.debug(f"Restarting finished for service {self.name}")
self.operating_state = ServiceOperatingState.RUNNING
self.restart_countdown -= 1

View File

@@ -167,7 +167,8 @@ class WebServer(Service):
# check if the payload is an HTTPPacket
if not isinstance(payload, HttpRequestPacket):
self.sys_log.error("Payload is not an HTTPPacket")
self.sys_log.warning(f"{self.name}: Payload is not an HTTPPacket")
self.sys_log.debug(f"{self.name}: {payload}")
return False
return self._process_http_request(payload=payload, session_id=session_id)

View File

@@ -6,7 +6,7 @@ from ipaddress import IPv4Address, IPv4Network
from typing import Any, Dict, Optional, TYPE_CHECKING, Union
from primaite.interface.request import RequestResponse
from primaite.simulator.core import _LOGGER, RequestManager, RequestType, SimComponent
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.file_system.file_system import FileSystem, Folder
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.transmission.network_layer import IPProtocol
@@ -287,7 +287,9 @@ class IOSoftware(Software):
Returns true if the software can perform actions.
"""
if self.software_manager and self.software_manager.node.operating_state != NodeOperatingState.ON:
_LOGGER.debug(f"{self.name} Error: {self.software_manager.node.hostname} is not online.")
self.software_manager.node.sys_log.error(
f"{self.name} Error: {self.software_manager.node.hostname} is not online."
)
return False
return True
@@ -308,7 +310,7 @@ class IOSoftware(Software):
# if over or at capacity, set to overwhelmed
if len(self._connections) >= self.max_sessions:
self.set_health_state(SoftwareHealthState.OVERWHELMED)
self.sys_log.error(f"{self.name}: Connect request for {connection_id=} declined. Service is at capacity.")
self.sys_log.warning(f"{self.name}: Connect request for {connection_id=} declined. Service is at capacity.")
return False
else:
# if service was previously overwhelmed, set to good because there is enough space for connections
@@ -327,7 +329,7 @@ class IOSoftware(Software):
self.sys_log.info(f"{self.name}: Connect request for {connection_id=} authorised")
return True
# connection with given id already exists
self.sys_log.error(
self.sys_log.warning(
f"{self.name}: Connect request for {connection_id=} declined. Connection already exists."
)
return False

View File

@@ -8,6 +8,7 @@ io_settings:
save_step_metadata: false
save_pcap_logs: true
save_sys_logs: true
sys_log_level: WARNING
game:
@@ -60,6 +61,102 @@ agents:
frequency: 4
variance: 3
- ref: defender
team: BLUE
type: ProxyAgent
observation_space:
type: CUSTOM
options:
components:
- type: NODES
label: NODES
options:
hosts:
- hostname: client_1
- hostname: client_2
- hostname: client_3
num_services: 1
num_applications: 0
num_folders: 1
num_files: 1
num_nics: 2
include_num_access: false
include_nmne: true
routers:
- hostname: router_1
num_ports: 0
ip_list:
- 192.168.10.21
- 192.168.10.22
- 192.168.10.23
wildcard_list:
- 0.0.0.1
port_list:
- 80
- 5432
protocol_list:
- ICMP
- TCP
- UDP
num_rules: 10
- type: LINKS
label: LINKS
options:
link_references:
- switch_1:eth-1<->client_1:eth-1
- switch_1:eth-2<->client_2:eth-1
- type: "NONE"
label: ICS
options: {}
action_space:
action_list:
- type: DONOTHING
action_map:
0:
action: DONOTHING
options: {}
options:
nodes:
- node_name: switch
- node_name: client_1
- node_name: client_2
- node_name: client_3
max_folders_per_node: 2
max_files_per_folder: 2
max_services_per_node: 2
max_nics_per_node: 8
max_acl_rules: 10
ip_list:
- 192.168.10.21
- 192.168.10.22
- 192.168.10.23
reward_function:
reward_components:
- type: DATABASE_FILE_INTEGRITY
weight: 0.5
options:
node_hostname: database_server
folder_name: database
file_name: database.db
- type: WEB_SERVER_404_PENALTY
weight: 0.5
options:
node_hostname: web_server
service_name: web_server_web_service
agent_settings:
flatten_obs: true
simulation:
network:
nodes:
@@ -75,6 +172,7 @@ simulation:
default_gateway: 192.168.10.1
dns_server: 192.168.1.10
applications:
- type: RansomwareScript
- type: WebBrowser
options:
target_url: http://arcd.com/users/

View File

@@ -0,0 +1,36 @@
from pathlib import Path
from typing import Union
import yaml
from primaite.config.load import data_manipulation_config_path
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator import LogLevel
from tests import TEST_ASSETS_ROOT
BASIC_CONFIG = TEST_ASSETS_ROOT / "configs/basic_switched_network.yaml"
def load_config(config_path: Union[str, Path]) -> PrimaiteGame:
"""Returns a PrimaiteGame object which loads the contents of a given yaml path."""
with open(config_path, "r") as f:
cfg = yaml.safe_load(f)
return PrimaiteGame.from_config(cfg)
def test_io_settings():
"""Test that the io_settings are loaded correctly."""
with open(BASIC_CONFIG, "r") as f:
cfg = yaml.safe_load(f)
env = PrimaiteGymEnv(game_config=cfg)
assert env.io.settings is not None
assert env.io.settings.sys_log_level is LogLevel.WARNING
assert env.io.settings.save_pcap_logs
assert env.io.settings.save_sys_logs
assert env.io.settings.save_step_metadata is False
assert env.io.settings.write_sys_log_to_terminal is False # false by default

View File

@@ -0,0 +1,126 @@
from uuid import uuid4
import pytest
from primaite.simulator import LogLevel, SIM_OUTPUT
from primaite.simulator.system.core.sys_log import SysLog
@pytest.fixture(scope="function")
def syslog() -> SysLog:
return SysLog(hostname="test")
def test_debug_sys_log_level(syslog, capsys):
"""Test that the debug log level logs debug syslogs and above."""
SIM_OUTPUT.sys_log_level = LogLevel.DEBUG
SIM_OUTPUT.write_sys_log_to_terminal = True
test_string = str(uuid4())
syslog.debug(test_string)
syslog.info(test_string)
syslog.warning(test_string)
syslog.error(test_string)
syslog.critical(test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" in captured
assert "INFO" in captured
assert "WARNING" in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_info_sys_log_level(syslog, capsys):
"""Test that the debug log level logs debug syslogs and above."""
SIM_OUTPUT.sys_log_level = LogLevel.INFO
SIM_OUTPUT.write_sys_log_to_terminal = True
test_string = str(uuid4())
syslog.debug(test_string)
syslog.info(test_string)
syslog.warning(test_string)
syslog.error(test_string)
syslog.critical(test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" in captured
assert "WARNING" in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_warning_sys_log_level(syslog, capsys):
"""Test that the debug log level logs debug syslogs and above."""
SIM_OUTPUT.sys_log_level = LogLevel.WARNING
SIM_OUTPUT.write_sys_log_to_terminal = True
test_string = str(uuid4())
syslog.debug(test_string)
syslog.info(test_string)
syslog.warning(test_string)
syslog.error(test_string)
syslog.critical(test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" not in captured
assert "WARNING" in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_error_sys_log_level(syslog, capsys):
"""Test that the debug log level logs debug syslogs and above."""
SIM_OUTPUT.sys_log_level = LogLevel.ERROR
SIM_OUTPUT.write_sys_log_to_terminal = True
test_string = str(uuid4())
syslog.debug(test_string)
syslog.info(test_string)
syslog.warning(test_string)
syslog.error(test_string)
syslog.critical(test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" not in captured
assert "WARNING" not in captured
assert "ERROR" in captured
assert "CRITICAL" in captured
def test_critical_sys_log_level(syslog, capsys):
"""Test that the debug log level logs debug syslogs and above."""
SIM_OUTPUT.sys_log_level = LogLevel.CRITICAL
SIM_OUTPUT.write_sys_log_to_terminal = True
test_string = str(uuid4())
syslog.debug(test_string)
syslog.info(test_string)
syslog.warning(test_string)
syslog.error(test_string)
syslog.critical(test_string)
captured = "".join(capsys.readouterr())
assert test_string in captured
assert "DEBUG" not in captured
assert "INFO" not in captured
assert "WARNING" not in captured
assert "ERROR" not in captured
assert "CRITICAL" in captured