#2912 - Remove some debugging print statements and apply pre-commit lint changes
This commit is contained in:
@@ -8,6 +8,7 @@ from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from primaite.interface.request import RequestFormat
|
||||
|
||||
|
||||
class AbstractAction(BaseModel):
|
||||
"""Base class for actions."""
|
||||
|
||||
@@ -37,6 +38,4 @@ class AbstractAction(BaseModel):
|
||||
"""Create an action component from a config dictionary."""
|
||||
if not config.get("type"):
|
||||
config.update({"type": cls.__name__})
|
||||
print("oooh")
|
||||
print(config)
|
||||
return cls(config=cls.ConfigSchema(**config))
|
||||
|
||||
@@ -127,8 +127,7 @@ class FirewallACLAddRuleAction(ACLAbstractAction, identifier="firewall_acl_add_r
|
||||
if config.src_ip == 0:
|
||||
return ["do_nothing"] # invalid formulation
|
||||
if config.src_port == 0:
|
||||
return ["do_nothing"] # invalid configuration.
|
||||
|
||||
return ["do_nothing"] # invalid configuration.
|
||||
|
||||
return [
|
||||
"network",
|
||||
@@ -153,7 +152,7 @@ class FirewallACLAddRuleAction(ACLAbstractAction, identifier="firewall_acl_add_r
|
||||
class FirewallACLRemoveRuleAction(AbstractAction, identifier="firewall_acl_remove_rule"):
|
||||
"""Action which removes a rule from a firewall port's ACL."""
|
||||
|
||||
config:"FirewallACLRemoveRuleAction.ConfigSchema"
|
||||
config: "FirewallACLRemoveRuleAction.ConfigSchema"
|
||||
|
||||
class ConfigSchema(AbstractAction.ConfigSchema):
|
||||
"""Configuration schema for FirewallACLRemoveRuleAction."""
|
||||
|
||||
@@ -38,7 +38,15 @@ class ConfigureRansomwareScriptAction(AbstractAction, identifier="c2_server_rans
|
||||
"""Return the action formatted as a request that can be ingested by the simulation."""
|
||||
if config.node_name is None:
|
||||
return ["do_nothing"]
|
||||
return ["network", "node", config.node_name, "application", "RansomwareScript", "configure", config.model_config]
|
||||
return [
|
||||
"network",
|
||||
"node",
|
||||
config.node_name,
|
||||
"application",
|
||||
"RansomwareScript",
|
||||
"configure",
|
||||
config.model_config,
|
||||
]
|
||||
|
||||
|
||||
class ConfigureDoSBotAction(AbstractAction, identifier="configure_dos_bot"):
|
||||
@@ -207,10 +215,7 @@ class ExfiltrationC2ServerAction(AbstractAction, identifier="c2_server_data_exfi
|
||||
exfiltration_folder_name: Optional[str]
|
||||
|
||||
@classmethod
|
||||
def form_request(
|
||||
cls,
|
||||
config: ConfigSchema
|
||||
) -> RequestFormat:
|
||||
def form_request(cls, config: ConfigSchema) -> RequestFormat:
|
||||
"""Return the action formatted as a request that can be ingested by the simulation."""
|
||||
if config.node_name is None:
|
||||
return ["do_nothing"]
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
||||
from typing import ClassVar
|
||||
|
||||
from primaite.game.agent.actions.manager import AbstractAction
|
||||
from primaite.interface.request import RequestFormat
|
||||
|
||||
|
||||
@@ -31,7 +31,6 @@ class DoNothingAction(AbstractAction, identifier="do_nothing"):
|
||||
class ConfigSchema(AbstractAction.ConfigSchema):
|
||||
"""Configuration Schema for DoNothingAction."""
|
||||
|
||||
# type: Literal["do_nothing"] = "do_nothing"
|
||||
type: str = "do_nothing"
|
||||
|
||||
@classmethod
|
||||
@@ -133,7 +132,7 @@ class ActionManager:
|
||||
def space(self) -> spaces.Space:
|
||||
"""Return the gymnasium action space for this agent."""
|
||||
return spaces.Discrete(len(self.action_map))
|
||||
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, game: "PrimaiteGame", cfg: Dict) -> "ActionManager":
|
||||
"""
|
||||
|
||||
@@ -90,7 +90,7 @@ class NodeNMAPAbstractAction(AbstractAction, identifier="node_nmap_abstract_acti
|
||||
"""Base Configuration Schema for NodeNMAP actions."""
|
||||
|
||||
target_ip_address: Union[str, List[str]]
|
||||
show: bool = False
|
||||
show: bool = False
|
||||
node_name: str
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -56,7 +56,7 @@ def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, Proxy
|
||||
assert svc.health_state_visible == SoftwareHealthState.UNUSED
|
||||
|
||||
# 2: Scan and check that the visible state is now correct
|
||||
action = ("node_service_scan", {"type":"node_service_scan" ,"node_name": "server_1", "service_name": "DNSServer"})
|
||||
action = ("node_service_scan", {"type": "node_service_scan", "node_name": "server_1", "service_name": "DNSServer"})
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
assert svc.health_state_actual == SoftwareHealthState.GOOD
|
||||
@@ -67,7 +67,7 @@ def test_node_service_scan_integration(game_and_agent: Tuple[PrimaiteGame, Proxy
|
||||
assert svc.health_state_visible == SoftwareHealthState.GOOD
|
||||
|
||||
# 4: Scan and check that the visible state is now correct
|
||||
action = ("node_service_scan", {"type":"node_service_scan", "node_name": "server_1", "service_name": "DNSServer"})
|
||||
action = ("node_service_scan", {"type": "node_service_scan", "node_name": "server_1", "service_name": "DNSServer"})
|
||||
agent.store_action(action)
|
||||
game.step()
|
||||
assert svc.health_state_actual == SoftwareHealthState.COMPROMISED
|
||||
|
||||
Reference in New Issue
Block a user