#2869 - fixes to agents and remove redundant prints
This commit is contained in:
@@ -1 +1,7 @@
|
||||
# © Crown-owned copyright 2025, Defence Science and Technology Laboratory UK
|
||||
from primaite.game.agent.interface import ProxyAgent
|
||||
from primaite.game.agent.scripted_agents.data_manipulation_bot import DataManipulationAgent
|
||||
from primaite.game.agent.scripted_agents.probabilistic_agent import ProbabilisticAgent
|
||||
from primaite.game.agent.scripted_agents.random_agent import PeriodicAgent, RandomAgent
|
||||
|
||||
__all__ = ("ProbabilisticAgent", "ProxyAgent", "RandomAgent", "PeriodicAgent", "DataManipulationAgent")
|
||||
|
||||
@@ -26,7 +26,7 @@ class ACLAddRuleAbstractAction(AbstractAction, ABC):
|
||||
class ConfigSchema(AbstractAction.ConfigSchema):
|
||||
"""Configuration Schema base for ACL add rule abstract actions."""
|
||||
|
||||
src_ip: IPV4Address
|
||||
src_ip: Union[IPV4Address, Literal["ALL"]]
|
||||
protocol_name: Union[IPProtocol, Literal["ALL"]]
|
||||
permission: Literal["PERMIT", "DENY"]
|
||||
position: int
|
||||
|
||||
@@ -36,7 +36,6 @@ class NodeAbstractAction(AbstractAction, identifier="node_abstract"):
|
||||
@classmethod
|
||||
def form_request(cls, config: ConfigSchema) -> RequestFormat:
|
||||
"""Return the action formatted as a request which can be ingested by the PrimAITE simulation."""
|
||||
print(config)
|
||||
return ["network", "node", config.node_name, config.verb]
|
||||
|
||||
|
||||
|
||||
@@ -44,10 +44,12 @@ class PeriodicAgent(AbstractScriptedAgent, identifier="PeriodicAgent"):
|
||||
|
||||
start_step: int = 5
|
||||
"The timestep at which an agent begins performing it's actions"
|
||||
start_variance: int = 0
|
||||
frequency: int = 5
|
||||
"The number of timesteps to wait between performing actions"
|
||||
variance: int = 0
|
||||
"The amount the frequency can randomly change to"
|
||||
max_executions: int = 999999
|
||||
possible_start_nodes: List[str]
|
||||
target_application: str
|
||||
|
||||
@@ -76,8 +78,6 @@ class PeriodicAgent(AbstractScriptedAgent, identifier="PeriodicAgent"):
|
||||
default_factory=lambda: PeriodicAgent.AgentSettingsSchema()
|
||||
)
|
||||
|
||||
max_executions: int = 999999
|
||||
"Maximum number of times the agent can execute its action."
|
||||
num_executions: int = 0
|
||||
"""Number of times the agent has executed an action."""
|
||||
next_execution_timestep: int = 0
|
||||
@@ -102,7 +102,7 @@ class PeriodicAgent(AbstractScriptedAgent, identifier="PeriodicAgent"):
|
||||
|
||||
def get_action(self, obs: ObsType, timestep: int) -> Tuple[str, Dict]:
|
||||
"""Do nothing, unless the current timestep is the next execution timestep, in which case do the action."""
|
||||
if timestep == self.next_execution_timestep and self.num_executions < self.max_executions:
|
||||
if timestep == self.next_execution_timestep and self.num_executions < self.config.agent_settings.max_executions:
|
||||
self.num_executions += 1
|
||||
self._set_next_execution_timestep(
|
||||
timestep + self.config.agent_settings.frequency, self.config.agent_settings.variance
|
||||
|
||||
@@ -383,7 +383,7 @@ class PrimaiteGame:
|
||||
|
||||
if service_class is not None:
|
||||
_LOGGER.debug(f"installing {service_type} on node {new_node.hostname}")
|
||||
new_node.software_manager.install(service_class)
|
||||
new_node.software_manager.install(service_class, software_config=service_cfg.get("options", {}))
|
||||
new_service = new_node.software_manager.software[service_class.__name__]
|
||||
|
||||
# fixing duration for the service
|
||||
|
||||
@@ -258,23 +258,15 @@ class Firewall(Router, identifier="firewall"):
|
||||
:param dmz: If True, shows ACL rules for DMZ interfaces.
|
||||
:param markdown: If True, formats the output in Markdown, enhancing readability in Markdown-compatible viewers.
|
||||
"""
|
||||
print(f"{self.hostname} Firewall Rules")
|
||||
print()
|
||||
if external:
|
||||
self.external_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.external_outbound_acl.show(markdown)
|
||||
print()
|
||||
if internal:
|
||||
self.internal_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.internal_outbound_acl.show(markdown)
|
||||
print()
|
||||
if dmz:
|
||||
self.dmz_inbound_acl.show(markdown)
|
||||
print()
|
||||
self.dmz_outbound_acl.show(markdown)
|
||||
print()
|
||||
|
||||
def receive_frame(self, frame: Frame, from_network_interface: RouterInterface):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user