Cosmetic changes to satisfy pre-commit

This commit is contained in:
Marek Wolan
2023-06-27 13:06:10 +01:00
parent 9b0e24c27b
commit be7d0e1745
11 changed files with 80 additions and 93 deletions

View File

@@ -31,7 +31,7 @@ def _get_primaite_config():
"INFO": logging.INFO,
"WARN": logging.WARN,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL
"CRITICAL": logging.CRITICAL,
}
primaite_config["log_level"] = log_level_map[primaite_config["log_level"]]
return primaite_config

View File

@@ -3,8 +3,8 @@
import logging
import os
import shutil
from pathlib import Path
from enum import Enum
from pathlib import Path
from typing import Optional
import pkg_resources
@@ -44,6 +44,7 @@ def logs(last_n: Annotated[int, typer.Option("-n")]):
:param last_n: The number of lines to print. Default value is 10.
"""
import re
from primaite import LOG_PATH
if os.path.isfile(LOG_PATH):
@@ -53,7 +54,7 @@ def logs(last_n: Annotated[int, typer.Option("-n")]):
print(re.sub(r"\n*", "", line))
_LogLevel = Enum("LogLevel", {k: k for k in logging._levelToName.values()}) # noqa
_LogLevel = Enum("LogLevel", {k: k for k in logging._levelToName.values()}) # noqa
@app.command()

View File

@@ -1,7 +1,7 @@
# Crown Copyright (C) Dstl 2022. DEFCON 703. Shared in confidence.
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Final, Union, Optional
from typing import Any, Dict, Final, Optional, Union
import yaml
@@ -167,8 +167,7 @@ def main_training_config_path() -> Path:
return path
def load(file_path: Union[str, Path],
legacy_file: bool = False) -> TrainingConfig:
def load(file_path: Union[str, Path], legacy_file: bool = False) -> TrainingConfig:
"""
Read in a training config yaml file.
@@ -213,9 +212,7 @@ def load(file_path: Union[str, Path],
def convert_legacy_training_config_dict(
legacy_config_dict: Dict[str, Any],
num_steps: int = 256,
action_type: str = "ANY"
legacy_config_dict: Dict[str, Any], num_steps: int = 256, action_type: str = "ANY"
) -> Dict[str, Any]:
"""
Convert a legacy training config dict to the new format.
@@ -227,10 +224,7 @@ def convert_legacy_training_config_dict(
don't have action_type values.
:return: The converted training config dict.
"""
config_dict = {
"num_steps": num_steps,
"action_type": action_type
}
config_dict = {"num_steps": num_steps, "action_type": action_type}
for legacy_key, value in legacy_config_dict.items():
new_key = _get_new_key_from_legacy(legacy_key)
if new_key:

View File

@@ -14,8 +14,7 @@ from gym import Env, spaces
from matplotlib import pyplot as plt
from primaite.acl.access_control_list import AccessControlList
from primaite.agents.utils import is_valid_acl_action_extra, \
is_valid_node_action
from primaite.agents.utils import is_valid_acl_action_extra, is_valid_node_action
from primaite.common.custom_typing import NodeUnion
from primaite.common.enums import (
ActionType,
@@ -24,8 +23,9 @@ from primaite.common.enums import (
NodePOLInitiator,
NodePOLType,
NodeType,
ObservationType,
Priority,
SoftwareState, ObservationType,
SoftwareState,
)
from primaite.common.service import Service
from primaite.config import training_config
@@ -35,15 +35,13 @@ from primaite.environment.reward import calculate_reward_function
from primaite.links.link import Link
from primaite.nodes.active_node import ActiveNode
from primaite.nodes.node import Node
from primaite.nodes.node_state_instruction_green import \
NodeStateInstructionGreen
from primaite.nodes.node_state_instruction_green import NodeStateInstructionGreen
from primaite.nodes.node_state_instruction_red import NodeStateInstructionRed
from primaite.nodes.passive_node import PassiveNode
from primaite.nodes.service_node import ServiceNode
from primaite.pol.green_pol import apply_iers, apply_node_pol
from primaite.pol.ier import IER
from primaite.pol.red_agent_pol import apply_red_agent_iers, \
apply_red_agent_node_pol
from primaite.pol.red_agent_pol import apply_red_agent_iers, apply_red_agent_node_pol
from primaite.transactions.transaction import Transaction
_LOGGER = logging.getLogger(__name__)
@@ -177,7 +175,6 @@ class Primaite(Env):
# It will be initialised later.
self.obs_handler: ObservationsHandler
# Open the config file and build the environment laydown
with open(self._lay_down_config_path, "r") as file:
# Open the config file and build the environment laydown
@@ -199,7 +196,6 @@ class Primaite(Env):
try:
plt.tight_layout()
nx.draw_networkx(self.network, with_labels=True)
now = datetime.now() # current date and time
file_path = session_path / f"network_{timestamp_str}.png"
plt.savefig(file_path, format="PNG")
@@ -238,7 +234,9 @@ class Primaite(Env):
self.action_dict = self.create_node_and_acl_action_dict()
self.action_space = spaces.Discrete(len(self.action_dict))
else:
_LOGGER.info(f"Invalid action type selected: {self.training_config.action_type}")
_LOGGER.info(
f"Invalid action type selected: {self.training_config.action_type}"
)
# Set up a csv to store the results of the training
try:
header = ["Episode", "Average Reward"]
@@ -379,7 +377,7 @@ class Primaite(Env):
self.step_count,
self.training_config,
)
#print(f" Step {self.step_count} Reward: {str(reward)}")
# print(f" Step {self.step_count} Reward: {str(reward)}")
self.total_reward += reward
if self.step_count == self.episode_steps:
self.average_reward = self.total_reward / self.step_count
@@ -1033,7 +1031,6 @@ class Primaite(Env):
"""
self.observation_type = ObservationType[observation_info["type"]]
def get_action_info(self, action_info):
"""
Extracts action_info.

View File

@@ -22,8 +22,7 @@ from stable_baselines3.ppo import MlpPolicy as PPOMlp
from primaite import SESSIONS_DIR, getLogger
from primaite.config.training_config import TrainingConfig
from primaite.environment.primaite_env import Primaite
from primaite.transactions.transactions_to_file import \
write_transaction_to_file
from primaite.transactions.transactions_to_file import write_transaction_to_file
_LOGGER = getLogger(__name__)
@@ -349,5 +348,3 @@ if __name__ == "__main__":
"Please provide a lay down config file using the --ldc " "argument"
)
run(training_config_path=args.tc, lay_down_config_path=args.ldc)

View File

@@ -46,6 +46,7 @@ class Node:
"""Sets the node state to ON."""
self.hardware_state = HardwareState.BOOTING
self.booting_count = self.config_values.node_booting_duration
def turn_off(self):
"""Sets the node state to OFF."""
self.hardware_state = HardwareState.OFF
@@ -64,14 +65,14 @@ class Node:
self.hardware_state = HardwareState.ON
def update_booting_status(self):
"""Updates the booting count"""
"""Updates the booting count."""
self.booting_count -= 1
if self.booting_count <= 0:
self.booting_count = 0
self.hardware_state = HardwareState.ON
def update_shutdown_status(self):
"""Updates the shutdown count"""
"""Updates the shutdown count."""
self.shutting_down_count -= 1
if self.shutting_down_count <= 0:
self.shutting_down_count = 0

View File

@@ -190,13 +190,15 @@ class ServiceNode(ActiveNode):
service_value.reduce_patching_count()
def update_resetting_status(self):
"""Update resetting counter and set software state if it reached 0."""
super().update_resetting_status()
if self.resetting_count <= 0:
for service in self.services.values():
service.software_state = SoftwareState.GOOD
def update_booting_status(self):
"""Update booting counter and set software to good if it reached 0."""
super().update_booting_status()
if self.booting_count <= 0:
for service in self.services.values():
service.software_state =SoftwareState.GOOD
service.software_state = SoftwareState.GOOD

View File

@@ -17,7 +17,6 @@ def start_jupyter_session():
.. todo:: Figure out how to get this working for Linux and MacOS too.
"""
if importlib.util.find_spec("jupyter") is not None:
jupyter_cmd = "python3 -m jupyter lab"
if sys.platform == "win32":

View File

@@ -27,7 +27,8 @@ def env(request):
@pytest.mark.env_config_paths(
dict(
training_config_path=TEST_CONFIG_ROOT / "obs_tests/main_config_without_obs.yaml",
training_config_path=TEST_CONFIG_ROOT
/ "obs_tests/main_config_without_obs.yaml",
lay_down_config_path=TEST_CONFIG_ROOT / "obs_tests/laydown.yaml",
)
)
@@ -43,7 +44,8 @@ def test_default_obs_space(env: Primaite):
@pytest.mark.env_config_paths(
dict(
training_config_path=TEST_CONFIG_ROOT / "obs_tests/main_config_without_obs.yaml",
training_config_path=TEST_CONFIG_ROOT
/ "obs_tests/main_config_without_obs.yaml",
lay_down_config_path=TEST_CONFIG_ROOT / "obs_tests/laydown.yaml",
)
)
@@ -140,7 +142,8 @@ class TestNodeLinkTable:
@pytest.mark.env_config_paths(
dict(
training_config_path=TEST_CONFIG_ROOT / "obs_tests/main_config_NODE_STATUSES.yaml",
training_config_path=TEST_CONFIG_ROOT
/ "obs_tests/main_config_NODE_STATUSES.yaml",
lay_down_config_path=TEST_CONFIG_ROOT / "obs_tests/laydown.yaml",
)
)

View File

@@ -1,7 +1,13 @@
"""Used to test Active Node functions."""
import pytest
from primaite.common.enums import FileSystemState, HardwareState, SoftwareState, NodeType, Priority
from primaite.common.enums import (
FileSystemState,
HardwareState,
NodeType,
Priority,
SoftwareState,
)
from primaite.common.service import Service
from primaite.config.training_config import TrainingConfig
from primaite.nodes.active_node import ActiveNode
@@ -10,24 +16,20 @@ from primaite.nodes.service_node import ServiceNode
@pytest.mark.parametrize(
"starting_operating_state, expected_operating_state",
[
(HardwareState.RESETTING, HardwareState.ON)
],
[(HardwareState.RESETTING, HardwareState.ON)],
)
def test_node_resets_correctly(starting_operating_state, expected_operating_state):
"""
Tests that a node resets correctly.
"""
"""Tests that a node resets correctly."""
active_node = ActiveNode(
node_id = "0",
name = "node",
node_type = NodeType.COMPUTER,
priority = Priority.P1,
hardware_state = starting_operating_state,
ip_address = "192.168.0.1",
software_state = SoftwareState.COMPROMISED,
file_system_state = FileSystemState.CORRUPT,
config_values=TrainingConfig()
node_id="0",
name="node",
node_type=NodeType.COMPUTER,
priority=Priority.P1,
hardware_state=starting_operating_state,
ip_address="192.168.0.1",
software_state=SoftwareState.COMPROMISED,
file_system_state=FileSystemState.CORRUPT,
config_values=TrainingConfig(),
)
for x in range(5):
@@ -37,35 +39,28 @@ def test_node_resets_correctly(starting_operating_state, expected_operating_stat
assert active_node.file_system_state_actual == FileSystemState.GOOD
assert active_node.hardware_state == expected_operating_state
@pytest.mark.parametrize(
"operating_state, expected_operating_state",
[
(HardwareState.BOOTING, HardwareState.ON)
],
[(HardwareState.BOOTING, HardwareState.ON)],
)
def test_node_boots_correctly(operating_state, expected_operating_state):
"""
Tests that a node boots correctly.
"""
"""Tests that a node boots correctly."""
service_node = ServiceNode(
node_id = 0,
name = "node",
node_type = "COMPUTER",
priority = "1",
hardware_state = operating_state,
ip_address = "192.168.0.1",
software_state = SoftwareState.GOOD,
file_system_state = "GOOD",
config_values = 1,
node_id=0,
name="node",
node_type="COMPUTER",
priority="1",
hardware_state=operating_state,
ip_address="192.168.0.1",
software_state=SoftwareState.GOOD,
file_system_state="GOOD",
config_values=1,
)
service_attributes = Service(
name = "node",
port = "80",
software_state = SoftwareState.COMPROMISED
)
service_node.add_service(
service_attributes
name="node", port="80", software_state=SoftwareState.COMPROMISED
)
service_node.add_service(service_attributes)
for x in range(5):
service_node.update_booting_status()
@@ -73,31 +68,26 @@ def test_node_boots_correctly(operating_state, expected_operating_state):
assert service_attributes.software_state == SoftwareState.GOOD
assert service_node.hardware_state == expected_operating_state
@pytest.mark.parametrize(
"operating_state, expected_operating_state",
[
(HardwareState.SHUTTING_DOWN, HardwareState.OFF)
],
[(HardwareState.SHUTTING_DOWN, HardwareState.OFF)],
)
def test_node_shutdown_correctly(operating_state, expected_operating_state):
"""
Tests that a node shutdown correctly.
"""
"""Tests that a node shutdown correctly."""
active_node = ActiveNode(
node_id = 0,
name = "node",
node_type = "COMPUTER",
priority = "1",
hardware_state = operating_state,
ip_address = "192.168.0.1",
software_state = SoftwareState.GOOD,
file_system_state = "GOOD",
config_values = 1,
node_id=0,
name="node",
node_type="COMPUTER",
priority="1",
hardware_state=operating_state,
ip_address="192.168.0.1",
software_state=SoftwareState.GOOD,
file_system_state="GOOD",
config_values=1,
)
for x in range(5):
active_node.update_shutdown_status()
assert active_node.hardware_state == expected_operating_state

View File

@@ -48,7 +48,8 @@ def test_single_action_space_is_valid():
"""Test to ensure the blue agent is using the ACL action space and is carrying out both kinds of operations."""
env = _get_primaite_env_from_config(
training_config_path=TEST_CONFIG_ROOT / "single_action_space_main_config.yaml",
lay_down_config_path=TEST_CONFIG_ROOT / "single_action_space_lay_down_config.yaml",
lay_down_config_path=TEST_CONFIG_ROOT
/ "single_action_space_lay_down_config.yaml",
)
run_generic_set_actions(env)
@@ -77,8 +78,10 @@ def test_single_action_space_is_valid():
def test_agent_is_executing_actions_from_both_spaces():
"""Test to ensure the blue agent is carrying out both kinds of operations (NODE & ACL)."""
env = _get_primaite_env_from_config(
training_config_path=TEST_CONFIG_ROOT / "single_action_space_fixed_blue_actions_main_config.yaml",
lay_down_config_path=TEST_CONFIG_ROOT / "single_action_space_lay_down_config.yaml",
training_config_path=TEST_CONFIG_ROOT
/ "single_action_space_fixed_blue_actions_main_config.yaml",
lay_down_config_path=TEST_CONFIG_ROOT
/ "single_action_space_lay_down_config.yaml",
)
# Run environment with specified fixed blue agent actions only
run_generic_set_actions(env)