Merged PR 126: PrimAITE Benchmarking

## Summary
 - Added full benchmarking script that included plots and a LaTeX report. Ran the v2.0.0rc1 benchmark. Tidied a few other things up.

The code is a bit scrappy. But it's not released code. I will endeavour to tidy it up at a later date.

## Test process
Manually ran the script. This is the final report -> [PrimAITE v2.0.0rc1 Learning Benchmark.pdf](https://dev.azure.com/ma-dev-uk/b50a61ee-86c4-48bc-9a0b-a67645ba12ee/_apis/git/repositories/2825053e-bd3b-45b2-8680-1281809eefa2/pullRequests/126/attachments/PrimAITE%20v2.0.0rc1%20Learning%20Benchmark.pdf)

## Checklist
- [X] This PR is linked to a **work item**
- [X] I have performed **self-review** of the code
- [ ] I have written **tests** for any new functionality added with this PR
- [ ] I have updated the **documentation** if this PR changes or adds functionality
- [X] I have run **pre-commit** checks for code style

Related work items: #1632
This commit is contained in:
Christopher McCarthy
2023-07-20 12:58:54 +00:00
16 changed files with 7010 additions and 37 deletions

1
.gitignore vendored
View File

@@ -147,3 +147,4 @@ docs/source/primaite-dependencies.rst
# outputs
src/primaite/outputs/
/benchmark/output/

View File

@@ -0,0 +1,163 @@
# Training Config File
# Sets which agent algorithm framework will be used.
# Options are:
# "SB3" (Stable Baselines3)
# "RLLIB" (Ray RLlib)
# "CUSTOM" (Custom Agent)
agent_framework: SB3
# Sets which deep learning framework will be used (by RLlib ONLY).
# Default is TF (Tensorflow).
# Options are:
# "TF" (Tensorflow)
# TF2 (Tensorflow 2.X)
# TORCH (PyTorch)
deep_learning_framework: TF2
# Sets which Agent class will be used.
# Options are:
# "A2C" (Advantage Actor Critic coupled with either SB3 or RLLIB agent_framework)
# "PPO" (Proximal Policy Optimization coupled with either SB3 or RLLIB agent_framework)
# "HARDCODED" (The HardCoded agents coupled with an ACL or NODE action_type)
# "DO_NOTHING" (The DoNothing agents coupled with an ACL or NODE action_type)
# "RANDOM" (primaite.agents.simple.RandomAgent)
# "DUMMY" (primaite.agents.simple.DummyAgent)
agent_identifier: PPO
# Sets whether Red Agent POL and IER is randomised.
# Options are:
# True
# False
random_red_agent: False
# The (integer) seed to be used in random number generation
# Default is None (null)
seed: null
# Set whether the agent will be deterministic instead of stochastic
# Options are:
# True
# False
deterministic: False
# Sets what view of the environment the deterministic hardcoded agent has. The default is BASIC.
# Options are:
# "BASIC" (The current observation space only)
# "FULL" (Full environment view with actions taken and reward feedback)
hard_coded_agent_view: FULL
# Sets How the Action Space is defined:
# "NODE"
# "ACL"
# "ANY" node and acl actions
action_type: NODE
# observation space
observation_space:
flatten: true
components:
- name: NODE_LINK_TABLE
- name: NODE_STATUSES
- name: LINK_TRAFFIC_LEVELS
# Number of episodes for training to run per session
num_train_episodes: 500
# Number of time_steps for training per episode
num_train_steps: 256
# Number of episodes for evaluation to run per session
num_eval_episodes: 1
# Number of time_steps for evaluation per episode
num_eval_steps: 256
# Sets how often the agent will save a checkpoint (every n time episodes).
# Set to 0 if no checkpoints are required. Default is 10
checkpoint_every_n_episodes: 0
# Time delay (milliseconds) between steps for CUSTOM agents.
time_delay: 5
# Type of session to be run. Options are:
# "TRAIN" (Trains an agent)
# "EVAL" (Evaluates an agent)
# "TRAIN_EVAL" (Trains then evaluates an agent)
session_type: TRAIN
# Environment config values
# The high value for the observation space
observation_space_high_value: 1000000000
# The Stable Baselines3 learn/eval output verbosity level:
# Options are:
# "NONE" (No Output)
# "INFO" (Info Messages (such as devices and wrappers used))
# "DEBUG" (All Messages)
sb3_output_verbose_level: NONE
# Reward values
# Generic
all_ok: 0
# Node Hardware State
off_should_be_on: -0.001
off_should_be_resetting: -0.0005
on_should_be_off: -0.0002
on_should_be_resetting: -0.0005
resetting_should_be_on: -0.0005
resetting_should_be_off: -0.0002
resetting: -0.0003
# Node Software or Service State
good_should_be_patching: 0.0002
good_should_be_compromised: 0.0005
good_should_be_overwhelmed: 0.0005
patching_should_be_good: -0.0005
patching_should_be_compromised: 0.0002
patching_should_be_overwhelmed: 0.0002
patching: -0.0003
compromised_should_be_good: -0.002
compromised_should_be_patching: -0.002
compromised_should_be_overwhelmed: -0.002
compromised: -0.002
overwhelmed_should_be_good: -0.002
overwhelmed_should_be_patching: -0.002
overwhelmed_should_be_compromised: -0.002
overwhelmed: -0.002
# Node File System State
good_should_be_repairing: 0.0002
good_should_be_restoring: 0.0002
good_should_be_corrupt: 0.0005
good_should_be_destroyed: 0.001
repairing_should_be_good: -0.0005
repairing_should_be_restoring: 0.0002
repairing_should_be_corrupt: 0.0002
repairing_should_be_destroyed: 0.0000
repairing: -0.0003
restoring_should_be_good: -0.001
restoring_should_be_repairing: -0.0002
restoring_should_be_corrupt: 0.0001
restoring_should_be_destroyed: 0.0002
restoring: -0.0006
corrupt_should_be_good: -0.001
corrupt_should_be_repairing: -0.001
corrupt_should_be_restoring: -0.001
corrupt_should_be_destroyed: 0.0002
corrupt: -0.001
destroyed_should_be_good: -0.002
destroyed_should_be_repairing: -0.002
destroyed_should_be_restoring: -0.002
destroyed_should_be_corrupt: -0.002
destroyed: -0.002
scanning: -0.0002
# IER status
red_ier_running: -0.0005
green_ier_blocked: -0.001
# Patching / Reset durations
os_patching_duration: 5 # The time taken to patch the OS
node_reset_duration: 5 # The time taken to reset a node (hardware)
service_patching_duration: 5 # The time taken to patch a service
file_system_repairing_limit: 5 # The time take to repair the file system
file_system_restoring_limit: 5 # The time take to restore the file system
file_system_scanning_limit: 5 # The time taken to scan the file system

View File

@@ -0,0 +1,452 @@
import json
import platform
import shutil
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Final, Optional, Tuple, Union
from unittest.mock import patch
import GPUtil
import plotly.graph_objects as go
import polars as pl
import psutil
import yaml
from plotly.graph_objs import Figure
from pylatex import Command, Document
from pylatex import Figure as LatexFigure
from pylatex import Section, Subsection, Tabular
from pylatex.utils import bold
import primaite
from primaite.config.lay_down_config import data_manipulation_config_path
from primaite.data_viz.session_plots import get_plotly_config
from primaite.environment.primaite_env import Primaite
from primaite.primaite_session import PrimaiteSession
_LOGGER = primaite.getLogger(__name__)
_BENCHMARK_ROOT = Path(__file__).parent
_RESULTS_ROOT: Final[Path] = _BENCHMARK_ROOT / "results"
_RESULTS_ROOT.mkdir(exist_ok=True, parents=True)
_OUTPUT_ROOT: Final[Path] = _BENCHMARK_ROOT / "output"
# Clear and recreate the output directory
if _OUTPUT_ROOT.exists():
shutil.rmtree(_OUTPUT_ROOT)
_OUTPUT_ROOT.mkdir()
_TRAINING_CONFIG_PATH = _BENCHMARK_ROOT / "config" / "benchmark_training_config.yaml"
_LAY_DOWN_CONFIG_PATH = data_manipulation_config_path()
def get_size(size_bytes: int):
"""
Scale bytes to its proper format.
e.g:
1253656 => '1.20MB'
1253656678 => '1.17GB'
:
"""
factor = 1024
for unit in ["", "K", "M", "G", "T", "P"]:
if size_bytes < factor:
return f"{size_bytes:.2f}{unit}B"
size_bytes /= factor
def _get_system_info() -> Dict:
"""Builds and returns a dict containing system info."""
uname = platform.uname()
cpu_freq = psutil.cpu_freq()
virtual_mem = psutil.virtual_memory()
swap_mem = psutil.swap_memory()
gpus = GPUtil.getGPUs()
return {
"System": {
"OS": uname.system,
"OS Version": uname.version,
"Machine": uname.machine,
"Processor": uname.processor,
},
"CPU": {
"Physical Cores": psutil.cpu_count(logical=False),
"Total Cores": psutil.cpu_count(logical=True),
"Max Frequency": f"{cpu_freq.max:.2f}Mhz",
},
"Memory": {"Total": get_size(virtual_mem.total), "Swap Total": get_size(swap_mem.total)},
"GPU": [{"Name": gpu.name, "Total Memory": f"{gpu.memoryTotal}MB"} for gpu in gpus],
}
def _build_benchmark_latex_report(
benchmark_metadata_dict: Dict, this_version_plot_path: Path, all_version_plot_path: Path
):
geometry_options = {"tmargin": "2.5cm", "rmargin": "2.5cm", "bmargin": "2.5cm", "lmargin": "2.5cm"}
data = benchmark_metadata_dict
primaite_version = data["primaite_version"]
# Create a new document
doc = Document("report", geometry_options=geometry_options)
# Title
doc.preamble.append(Command("title", f"PrimAITE {primaite_version} Learning Benchmark"))
doc.preamble.append(Command("author", "PrimAITE Dev Team"))
doc.preamble.append(Command("date", datetime.now().date()))
doc.append(Command("maketitle"))
sessions = data["total_sessions"]
episodes = data["training_config"]["num_train_episodes"]
steps = data["training_config"]["num_train_steps"]
# Body
with doc.create(Section("Introduction")):
doc.append(
f"PrimAITE v{primaite_version} was benchmarked automatically upon release. Learning rate metrics "
f"were captured to be referenced during system-level testing and user acceptance testing (UAT)."
)
doc.append(
f"\nThe benchmarking process consists of running {sessions} training session using the same "
f"training and lay down config files. Each session trains an agent for {episodes} episodes, "
f"with each episode consisting of {steps} steps."
)
doc.append(
f"\nThe mean reward per episode from each session is captured. This is then used to calculate a "
f"combined average reward per episode from the {sessions} individual sessions for smoothing. "
f"Finally, a 25-widow rolling average of the combined average reward per session is calculated for "
f"further smoothing."
)
with doc.create(Section("System Information")):
with doc.create(Subsection("Python")):
with doc.create(Tabular("|l|l|")) as table:
table.add_hline()
table.add_row((bold("Version"), sys.version))
table.add_hline()
for section, section_data in data["system_info"].items():
if section_data:
with doc.create(Subsection(section)):
if isinstance(section_data, dict):
with doc.create(Tabular("|l|l|")) as table:
table.add_hline()
for key, value in section_data.items():
table.add_row((bold(key), value))
table.add_hline()
elif isinstance(section_data, list):
headers = section_data[0].keys()
tabs_str = "|".join(["l" for _ in range(len(headers))])
tabs_str = f"|{tabs_str}|"
with doc.create(Tabular(tabs_str)) as table:
table.add_hline()
table.add_row([bold(h) for h in headers])
table.add_hline()
for item in section_data:
table.add_row(item.values())
table.add_hline()
headers_map = {
"total_sessions": "Total Sessions",
"total_episodes": "Total Episodes",
"total_time_steps": "Total Steps",
"av_s_per_session": "Av Session Duration (s)",
"av_s_per_step": "Av Step Duration (s)",
"av_s_per_100_steps_10_nodes": "Av Duration per 100 Steps per 10 Nodes (s)",
}
with doc.create(Section("Stats")):
with doc.create(Subsection("Benchmark Results")):
with doc.create(Tabular("|l|l|")) as table:
table.add_hline()
for section, header in headers_map.items():
if section.startswith("av_"):
table.add_row((bold(header), f"{data[section]:.4f}"))
else:
table.add_row((bold(header), data[section]))
table.add_hline()
with doc.create(Section("Graphs")):
with doc.create(Subsection(f"PrimAITE {primaite_version} Learning Benchmark Plot")):
with doc.create(LatexFigure(position="h!")) as pic:
pic.add_image(str(this_version_plot_path))
pic.add_caption(f"PrimAITE {primaite_version} Learning Benchmark Plot")
with doc.create(Subsection("PrimAITE All Versions Learning Benchmark Plot")):
with doc.create(LatexFigure(position="h!")) as pic:
pic.add_image(str(all_version_plot_path))
pic.add_caption("PrimAITE All Versions Learning Benchmark Plot")
doc.generate_pdf(str(this_version_plot_path).replace(".png", ""), clean_tex=True)
class BenchmarkPrimaiteSession(PrimaiteSession):
"""A benchmarking primaite session."""
def __init__(
self,
training_config_path: Union[str, Path],
lay_down_config_path: Union[str, Path],
):
super().__init__(training_config_path, lay_down_config_path)
self.setup()
@property
def env(self) -> Primaite:
"""Direct access to the env for ease of testing."""
return self._agent_session._env # noqa
def __enter__(self):
return self
def __exit__(self, type, value, tb):
shutil.rmtree(self.session_path)
_LOGGER.debug(f"Deleted benchmark session directory: {self.session_path}")
def _learn_benchmark_durations(self) -> Tuple[float, float, float]:
"""
Calculate and return the learning benchmark durations.
Calculates the:
- Total learning time in seconds
- Total learning time per time step in seconds
- Total learning time per 100 time steps per 10 nodes in seconds
:return: The learning benchmark durations as a Tuple of three floats:
Tuple[total_s, s_per_step, s_per_100_steps_10_nodes].
"""
data = self.metadata_file_as_dict()
start_dt = datetime.fromisoformat(data["start_datetime"])
end_dt = datetime.fromisoformat(data["end_datetime"])
delta = end_dt - start_dt
total_s = delta.total_seconds()
total_steps = data["learning"]["total_time_steps"]
s_per_step = total_s / total_steps
num_nodes = self.env.num_nodes
num_intervals = total_steps / 100
av_interval_time = total_s / num_intervals
s_per_100_steps_10_nodes = av_interval_time / (num_nodes / 10)
return total_s, s_per_step, s_per_100_steps_10_nodes
def learn_metadata_dict(self) -> Dict[str, Any]:
"""Metadata specific to the learning session."""
total_s, s_per_step, s_per_100_steps_10_nodes = self._learn_benchmark_durations()
return {
"total_episodes": self.env.actual_episode_count,
"total_time_steps": self.env.total_step_count,
"total_s": total_s,
"s_per_step": s_per_step,
"s_per_100_steps_10_nodes": s_per_100_steps_10_nodes,
"av_reward_per_episode": self.learn_av_reward_per_episode_dict(),
}
def _get_benchmark_session_path(session_timestamp: datetime) -> Path:
return _OUTPUT_ROOT / session_timestamp.strftime("%Y-%m-%d_%H-%M-%S")
def _get_benchmark_primaite_session() -> BenchmarkPrimaiteSession:
with patch("primaite.agents.agent_abc.get_session_path", _get_benchmark_session_path) as mck:
mck.session_timestamp = datetime.now()
return BenchmarkPrimaiteSession(_TRAINING_CONFIG_PATH, _LAY_DOWN_CONFIG_PATH)
def _build_benchmark_results_dict(start_datetime: datetime, metadata_dict: Dict) -> dict:
n = len(metadata_dict)
with open(_TRAINING_CONFIG_PATH, "r") as file:
training_config_dict = yaml.safe_load(file)
with open(_LAY_DOWN_CONFIG_PATH, "r") as file:
lay_down_config_dict = yaml.safe_load(file)
averaged_data = {
"start_timestamp": start_datetime.isoformat(),
"end_datetime": datetime.now().isoformat(),
"primaite_version": primaite.__version__,
"system_info": _get_system_info(),
"total_sessions": n,
"total_episodes": sum(d["total_episodes"] for d in metadata_dict.values()),
"total_time_steps": sum(d["total_time_steps"] for d in metadata_dict.values()),
"av_s_per_session": sum(d["total_s"] for d in metadata_dict.values()) / n,
"av_s_per_step": sum(d["s_per_step"] for d in metadata_dict.values()) / n,
"av_s_per_100_steps_10_nodes": sum(d["s_per_100_steps_10_nodes"] for d in metadata_dict.values()) / n,
"combined_av_reward_per_episode": {},
"session_av_reward_per_episode": {k: v["av_reward_per_episode"] for k, v in metadata_dict.items()},
"training_config": training_config_dict,
"lay_down_config": lay_down_config_dict,
}
episodes = metadata_dict[1]["av_reward_per_episode"].keys()
for episode in episodes:
combined_av_reward = sum(metadata_dict[k]["av_reward_per_episode"][episode] for k in metadata_dict.keys()) / n
averaged_data["combined_av_reward_per_episode"][episode] = combined_av_reward
return averaged_data
def _get_df_from_episode_av_reward_dict(data: Dict):
data: Dict = {"episode": data.keys(), "av_reward": data.values()}
return (
pl.from_dict(data)
.with_columns(rolling_mean=pl.col("av_reward").rolling_mean(window_size=25))
.rename({"rolling_mean": "rolling_av_reward"})
)
def _plot_benchmark_metadata(
benchmark_metadata_dict: Dict,
title: Optional[str] = None,
subtitle: Optional[str] = None,
) -> Figure:
if title:
if subtitle:
title = f"{title} <br>{subtitle}</sup>"
else:
if subtitle:
title = subtitle
config = get_plotly_config()
layout = go.Layout(
autosize=config["size"]["auto_size"],
width=config["size"]["width"],
height=config["size"]["height"],
)
# Create the line graph with a colored line
fig = go.Figure(layout=layout)
fig.update_layout(template=config["template"])
for session, av_reward_dict in benchmark_metadata_dict["session_av_reward_per_episode"].items():
df = _get_df_from_episode_av_reward_dict(av_reward_dict)
fig.add_trace(
go.Scatter(
x=df["episode"],
y=df["av_reward"],
mode="lines",
name=f"Session {session}",
opacity=0.25,
line={"color": "#a6a6a6"},
)
)
df = _get_df_from_episode_av_reward_dict(benchmark_metadata_dict["combined_av_reward_per_episode"])
fig.add_trace(
go.Scatter(
x=df["episode"], y=df["av_reward"], mode="lines", name="Combined Session Av", line={"color": "#FF0000"}
)
)
fig.add_trace(
go.Scatter(
x=df["episode"],
y=df["rolling_av_reward"],
mode="lines",
name="Rolling Av (Combined Session Av)",
line={"color": "#4CBB17"},
)
)
# Set the layout of the graph
fig.update_layout(
xaxis={
"title": "Episode",
"type": "linear",
},
yaxis={"title": "Average Reward"},
title=title,
)
return fig
def _plot_all_benchmarks_combined_session_av():
"""
Plot the Benchmark results for each released version of PrimAITE.
Does this by iterating over the ``benchmark/results`` directory and
extracting the benchmark metadata json for each version that has been
benchmarked. The combined_av_reward_per_episode is extracted from each,
converted into a polars dataframe, and plotted as a scatter line in plotly.
"""
title = "PrimAITE Versions Learning Benchmark"
subtitle = "Rolling Av (Combined Session Av)"
if title:
if subtitle:
title = f"{title} <br>{subtitle}</sup>"
else:
if subtitle:
title = subtitle
config = get_plotly_config()
layout = go.Layout(
autosize=config["size"]["auto_size"],
width=config["size"]["width"],
height=config["size"]["height"],
)
# Create the line graph with a colored line
fig = go.Figure(layout=layout)
fig.update_layout(template=config["template"])
for dir in _RESULTS_ROOT.iterdir():
if dir.is_dir():
metadata_file = dir / f"{dir.name}_benchmark_metadata.json"
with open(metadata_file, "r") as file:
metadata_dict = json.load(file)
df = _get_df_from_episode_av_reward_dict(metadata_dict["combined_av_reward_per_episode"])
fig.add_trace(
go.Scatter(
x=df["episode"], y=df["rolling_av_reward"], mode="lines", name=dir.name, line={"color": "#FF0000"}
)
)
# Set the layout of the graph
fig.update_layout(
xaxis={
"title": "Episode",
"type": "linear",
},
yaxis={"title": "Average Reward"},
title=title,
)
fig["data"][0]["showlegend"] = True
return fig
def run():
"""Run the PrimAITE benchmark."""
start_datetime = datetime.now()
av_reward_per_episode_dicts = {}
for i in range(1, 11):
print(f"Starting Benchmark Session: {i}")
with _get_benchmark_primaite_session() as session:
session.learn()
av_reward_per_episode_dicts[i] = session.learn_metadata_dict()
benchmark_metadata = _build_benchmark_results_dict(
start_datetime=start_datetime, metadata_dict=av_reward_per_episode_dicts
)
v_str = f"v{primaite.__version__}"
version_result_dir = _RESULTS_ROOT / v_str
if version_result_dir.exists():
shutil.rmtree(version_result_dir)
version_result_dir.mkdir(exist_ok=True, parents=True)
with open(version_result_dir / f"{v_str}_benchmark_metadata.json", "w") as file:
json.dump(benchmark_metadata, file, indent=4)
title = f"PrimAITE v{primaite.__version__.strip()} Learning Benchmark"
fig = _plot_benchmark_metadata(benchmark_metadata, title=title)
this_version_plot_path = version_result_dir / f"{title}.png"
fig.write_image(this_version_plot_path)
fig = _plot_all_benchmarks_combined_session_av()
all_version_plot_path = _RESULTS_ROOT / "PrimAITE Versions Learning Benchmark.png"
fig.write_image(all_version_plot_path)
_build_benchmark_latex_report(benchmark_metadata, this_version_plot_path, all_version_plot_path)
if __name__ == "__main__":
run()

Binary file not shown.

After

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 481 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -55,8 +55,10 @@ dev = [
"build==0.10.0",
"flake8==6.0.0",
"furo==2023.3.27",
"gputil==1.4.0",
"pip-licenses==4.3.0",
"pre-commit==2.20.0",
"pylatex==1.4.1",
"pytest==7.2.0",
"pytest-xdist==3.3.1",
"pytest-cov==4.0.0",

View File

@@ -152,4 +152,4 @@ def getLogger(name: str) -> Logger: # noqa
with open(Path(__file__).parent.resolve() / "VERSION", "r") as file:
__version__ = file.readline()
__version__ = file.readline().strip()

View File

@@ -246,6 +246,7 @@ class TrainingConfig:
return data
def __str__(self) -> str:
obs_str = ",".join([c["name"] for c in self.observation_space["components"]])
tc = f"{self.agent_framework}, "
if self.agent_framework is AgentFramework.RLLIB:
tc += f"{self.deep_learning_framework}, "
@@ -253,7 +254,7 @@ class TrainingConfig:
if self.agent_identifier is AgentIdentifier.HARDCODED:
tc += f"{self.hard_coded_agent_view}, "
tc += f"{self.action_type}, "
tc += f"observation_space={self.observation_space}, "
tc += f"observation_space={obs_str}, "
if self.session_type is SessionType.TRAIN:
tc += f"{self.num_train_episodes} episodes @ "
tc += f"{self.num_train_steps} steps"

View File

@@ -10,7 +10,7 @@ from plotly.graph_objs import Figure
from primaite import _PLATFORM_DIRS
def _get_plotly_config() -> Dict:
def get_plotly_config() -> Dict:
"""Get the plotly config from primaite_config.yaml."""
user_config_path = _PLATFORM_DIRS.user_config_path / "primaite_config.yaml"
with open(user_config_path, "r") as file:
@@ -41,7 +41,7 @@ def plot_av_reward_per_episode(
if subtitle:
title = subtitle
config = _get_plotly_config()
config = get_plotly_config()
layout = go.Layout(
autosize=config["size"]["auto_size"],
width=config["size"]["width"],

View File

@@ -1,6 +1,5 @@
# Crown Owned Copyright (C) Dstl 2023. DEFCON 703. Shared in confidence.
"""Defines node behaviour for Green PoL."""
from dataclasses import dataclass
from typing import TYPE_CHECKING, Union
from primaite.common.enums import NodePOLType
@@ -9,8 +8,7 @@ if TYPE_CHECKING:
from primaite.common.enums import FileSystemState, HardwareState, NodePOLInitiator, SoftwareState
@dataclass()
class NodeStateInstructionRed(object):
class NodeStateInstructionRed:
"""The Node State Instruction class."""
def __init__(

View File

@@ -250,6 +250,11 @@ def apply_red_agent_node_pol(
# continue --------------------------
target_node: NodeUnion = nodes[target_node_id]
# check if the initiator type is a str, and if so, cast it as
# NodePOLInitiator
if isinstance(initiator, str):
initiator = NodePOLInitiator[initiator]
# Based the action taken on the initiator type
if initiator == NodePOLInitiator.DIRECT:
# No conditions required, just apply the change

View File

@@ -2,8 +2,9 @@
"""Main entry point to PrimAITE. Configure training/evaluation experiments and input/output."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, Final, Optional, Union
from typing import Any, Dict, Final, Optional, Tuple, Union
from primaite import getLogger
from primaite.agents.agent_abc import AgentSessionABC
@@ -16,6 +17,7 @@ from primaite.common.enums import ActionType, AgentFramework, AgentIdentifier, S
from primaite.config import lay_down_config, training_config
from primaite.config.training_config import TrainingConfig
from primaite.utils.session_metadata_parser import parse_session_metadata
from primaite.utils.session_output_reader import all_transactions_dict, av_rewards_dict
_LOGGER = getLogger(__name__)
@@ -186,3 +188,28 @@ class PrimaiteSession:
def close(self) -> None:
"""Closes the agent."""
self._agent_session.close()
def learn_av_reward_per_episode_dict(self) -> Dict[int, float]:
"""Get the learn av reward per episode from file."""
csv_file = f"average_reward_per_episode_{self.timestamp_str}.csv"
return av_rewards_dict(self.learning_path / csv_file)
def eval_av_reward_per_episode_dict(self) -> Dict[int, float]:
"""Get the eval av reward per episode from file."""
csv_file = f"average_reward_per_episode_{self.timestamp_str}.csv"
return av_rewards_dict(self.evaluation_path / csv_file)
def learn_all_transactions_dict(self) -> Dict[Tuple[int, int], Dict[str, Any]]:
"""Get the learn all transactions from file."""
csv_file = f"all_transactions_{self.timestamp_str}.csv"
return all_transactions_dict(self.learning_path / csv_file)
def eval_all_transactions_dict(self) -> Dict[Tuple[int, int], Dict[str, Any]]:
"""Get the eval all transactions from file."""
csv_file = f"all_transactions_{self.timestamp_str}.csv"
return all_transactions_dict(self.evaluation_path / csv_file)
def metadata_file_as_dict(self) -> Dict[str, Any]:
"""Read the session_metadata.json file and return as a dict."""
with open(self.session_path / "session_metadata.json", "r") as file:
return json.load(file)

View File

@@ -18,7 +18,7 @@ def av_rewards_dict(av_rewards_csv_file: Union[str, Path]) -> Dict[int, float]:
"""
df_dict = pl.read_csv(av_rewards_csv_file).to_dict()
return {v: df_dict["Average Reward"][i] for i, v in enumerate(df_dict["Episode"])}
return {int(v): df_dict["Average Reward"][i] for i, v in enumerate(df_dict["Episode"])}
def all_transactions_dict(all_transactions_csv_file: Union[str, Path]) -> Dict[Tuple[int, int], Dict[str, Any]]:

View File

@@ -1,11 +1,10 @@
# Crown Owned Copyright (C) Dstl 2023. DEFCON 703. Shared in confidence.
import datetime
import json
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Tuple, Union
from typing import Union
from unittest.mock import patch
import pytest
@@ -13,7 +12,6 @@ import pytest
from primaite import getLogger
from primaite.environment.primaite_env import Primaite
from primaite.primaite_session import PrimaiteSession
from primaite.utils.session_output_reader import all_transactions_dict, av_rewards_dict
from tests.mock_and_patch.get_session_path_mock import get_temp_session_path
ACTION_SPACE_NODE_VALUES = 1
@@ -37,31 +35,6 @@ class TempPrimaiteSession(PrimaiteSession):
super().__init__(training_config_path, lay_down_config_path)
self.setup()
def learn_av_reward_per_episode_dict(self) -> Dict[int, float]:
"""Get the learn av reward per episode from file."""
csv_file = f"average_reward_per_episode_{self.timestamp_str}.csv"
return av_rewards_dict(self.learning_path / csv_file)
def eval_av_reward_per_episode_dict(self) -> Dict[int, float]:
"""Get the eval av reward per episode from file."""
csv_file = f"average_reward_per_episode_{self.timestamp_str}.csv"
return av_rewards_dict(self.evaluation_path / csv_file)
def learn_all_transactions_dict(self) -> Dict[Tuple[int, int], Dict[str, Any]]:
"""Get the learn all transactions from file."""
csv_file = f"all_transactions_{self.timestamp_str}.csv"
return all_transactions_dict(self.learning_path / csv_file)
def eval_all_transactions_dict(self) -> Dict[Tuple[int, int], Dict[str, Any]]:
"""Get the eval all transactions from file."""
csv_file = f"all_transactions_{self.timestamp_str}.csv"
return all_transactions_dict(self.evaluation_path / csv_file)
def metadata_file_as_dict(self) -> Dict[str, Any]:
"""Read the session_metadata.json file and return as a dict."""
with open(self.session_path / "session_metadata.json", "r") as file:
return json.load(file)
@property
def env(self) -> Primaite:
"""Direct access to the env for ease of testing."""