Merged PR 592: #2869 - fixes to agents and remove redundant prints
## Summary *Replace this text with an explanation of what the changes are and how you implemented them. Can this impact any other parts of the codebase that we should keep in mind?* ## Test process *How have you tested this (if applicable)?* ## Checklist - [ ] PR is linked to a **work item** - [ ] **acceptance criteria** of linked ticket are met - [ ] performed **self-review** of the code - [ ] written **tests** for any new functionality added with this PR - [ ] updated the **documentation** if this PR changes or adds functionality - [ ] written/updated **design docs** if this PR implements new functionality - [ ] updated the **change log** - [ ] ran **pre-commit** checks for code style - [ ] attended to any **TO-DOs** left in the code #2869 - fixes to agents and remove redundant prints Related work items: #2869
This commit is contained in:
@@ -1 +1,7 @@
|
||||
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
|
||||
from primaite.game.agent.interface import ProxyAgent
|
||||
from primaite.game.agent.scripted_agents.data_manipulation_bot import DataManipulationAgent
|
||||
from primaite.game.agent.scripted_agents.probabilistic_agent import ProbabilisticAgent
|
||||
from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent, RandomAgent
|
||||
|
||||
__all__ = ("ProbabilisticAgent", "ProxyAgent", "RandomAgent", "PeriodicAgent", "DataManipulationAgent")
|
||||
|
||||
@@ -26,7 +26,7 @@ class ACLAddRuleAbstractAction(AbstractAction, ABC):
|
||||
class ConfigSchema(AbstractAction.ConfigSchema):
|
||||
"""Configuration Schema base for ACL add rule abstract actions."""
|
||||
|
||||
src_ip: IPV4Address
|
||||
src_ip: Union[IPV4Address, Literal["ALL"]]
|
||||
protocol_name: Union[IPProtocol, Literal["ALL"]]
|
||||
permission: Literal["PERMIT", "DENY"]
|
||||
position: int
|
||||
|
||||
@@ -36,7 +36,6 @@ class NodeAbstractAction(AbstractAction, identifier="node_abstract"):
|
||||
@classmethod
|
||||
def form_request(cls, config: ConfigSchema) -> RequestFormat:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
print(config)
|
||||
return ["network", "node", config.node_name, config.verb]
|
||||
|
||||
|
||||
|
||||
@@ -44,10 +44,12 @@ class PeriodicAgent(AbstractScriptedAgent, identifier="PeriodicAgent"):
|
||||
|
||||
start_step: int = 5
|
||||
"The timestep at which an agent begins performing it's actions"
|
||||
start_variance: int = 0
|
||||
frequency: int = 5
|
||||
"The number of timesteps to wait between performing actions"
|
||||
variance: int = 0
|
||||
"The amount the frequency can randomly change to"
|
||||
max_executions: int = 999999
|
||||
possible_start_nodes: List[str]
|
||||
target_application: str
|
||||
|
||||
@@ -76,8 +78,6 @@ class PeriodicAgent(AbstractScriptedAgent, identifier="PeriodicAgent"):
|
||||
default_factory=lambda: PeriodicAgent.AgentSettingsSchema()
|
||||
)
|
||||
|
||||
max_executions: int = 999999
|
||||
"Maximum number of times the agent can execute its action."
|
||||
num_executions: int = 0
|
||||
"""Number of times the agent has executed an action."""
|
||||
next_execution_timestep: int = 0
|
||||
@@ -102,7 +102,7 @@ class PeriodicAgent(AbstractScriptedAgent, identifier="PeriodicAgent"):
|
||||
|
||||
def get_action(self, obs: ObsType, timestep: int) -> Tuple[str, Dict]:
|
||||
"""Do nothing, unless the current timestep is the next execution timestep, in which case do the action."""
|
||||
if timestep == self.next_execution_timestep and self.num_executions < self.max_executions:
|
||||
if timestep == self.next_execution_timestep and self.num_executions < self.config.agent_settings.max_executions:
|
||||
self.num_executions += 1
|
||||
self._set_next_execution_timestep(
|
||||
timestep + self.config.agent_settings.frequency, self.config.agent_settings.variance
|
||||
|
||||
@@ -383,7 +383,7 @@ class PrimaiteGame:
|
||||
|
||||
if service_class is not None:
|
||||
_LOGGER.debug(f"installing {service_type} on node {new_node.hostname}")
|
||||
new_node.software_manager.install(service_class)
|
||||
new_node.software_manager.install(service_class, software_config=service_cfg.get("options", {}))
|
||||
new_service = new_node.software_manager.software[service_class.__name__]
|
||||
|
||||
# fixing duration for the service
|
||||
|
||||
@@ -258,23 +258,15 @@ class Firewall(Router, identifier="firewall"):
|
||||
:param dmz: If True, shows ACL rules for DMZ interfaces.
|
||||
:param markdown: If True, formats the output in Markdown, enhancing readability in Markdown-compatible viewers.
|
||||
"""
|
||||
print(f"{self.hostname} Firewall Rules")
|
||||
print()
|
||||
if external:
|
||||
self.external_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.external_outbound_acl.show(markdown)
|
||||
print()
|
||||
if internal:
|
||||
self.internal_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.internal_outbound_acl.show(markdown)
|
||||
print()
|
||||
if dmz:
|
||||
self.dmz_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.dmz_outbound_acl.show(markdown)
|
||||
print()
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user