Cosmetic changes based on PR feedback

This commit is contained in:
Marek Wolan
2024-02-06 17:32:15 +00:00
parent e500eccaf7
commit c35c060448
4 changed files with 40 additions and 16 deletions

View File

@@ -345,7 +345,7 @@ class PrimaiteGame:
action_space = ActionManager.from_config(game, action_space_cfg)
# CREATE REWARD FUNCTION
rew_function = RewardFunction.from_config(reward_function_cfg)
reward_function = RewardFunction.from_config(reward_function_cfg)
# OTHER AGENT SETTINGS
agent_settings = AgentSettings.from_config(agent_cfg.get("agent_settings"))
@@ -357,7 +357,7 @@ class PrimaiteGame:
agent_name=agent_cfg["ref"],
action_space=action_space,
observation_space=obs_space,
reward_function=rew_function,
reward_function=reward_function,
agent_settings=agent_settings,
)
game.agents.append(new_agent)
@@ -366,7 +366,7 @@ class PrimaiteGame:
agent_name=agent_cfg["ref"],
action_space=action_space,
observation_space=obs_space,
reward_function=rew_function,
reward_function=reward_function,
agent_settings=agent_settings,
)
game.agents.append(new_agent)
@@ -376,7 +376,7 @@ class PrimaiteGame:
agent_name=agent_cfg["ref"],
action_space=action_space,
observation_space=obs_space,
reward_function=rew_function,
reward_function=reward_function,
agent_settings=agent_settings,
)
game.agents.append(new_agent)

View File

@@ -46,7 +46,7 @@
"source": [
"## Green agent\n",
"\n",
"There are green agents is logged onto client 1 and client 2. They use the web browser to navigate to `http://arcd.com/users`. The web server replies with a status code 200 if the data is available on the database or 404 if not available."
"There are green agents logged onto client 1 and client 2. They use the web browser to navigate to `http://arcd.com/users`. The web server replies with a status code 200 if the data is available on the database or 404 if not available."
]
},
{
@@ -68,7 +68,7 @@
"source": [
"## Blue agent\n",
"\n",
"The blue agent can view the entire network, but the health statuses of components are not updated until a scan is performed. The blue agent should restore the database file from backup after it was compromised. It can also prevent further attacks by blocking client 1 from sending the malicious SQL query to the database server. This can be done by removing implementing an ACL rule on the router."
"The blue agent can view the entire network, but the health statuses of components are not updated until a scan is performed. The blue agent should restore the database file from backup after it was compromised. It can also prevent further attacks by blocking client 1 from sending the malicious SQL query to the database server. This can be done by implementing an ACL rule on the router."
]
},
{

View File

@@ -1,5 +1,6 @@
from enum import Enum
from ipaddress import IPv4Address
from typing import Dict, List, Literal, Optional, Union
from typing import Dict, List, Optional
from urllib.parse import urlparse
from pydantic import BaseModel, ConfigDict
@@ -135,11 +136,21 @@ class WebBrowser(Application):
f"{self.name}: Received HTTP {payload.request_method.name} "
f"Response {payload.request_url} - {self.latest_response.status_code.value}"
)
self.history.append(WebBrowser.BrowserHistoryItem(url=url, outcome=self.latest_response.status_code))
self.history.append(
WebBrowser.BrowserHistoryItem(
url=url,
status=self.BrowserHistoryItem._HistoryItemStatus.LOADED,
response_code=self.latest_response.status_code,
)
)
return self.latest_response.status_code is HttpStatusCode.OK
else:
self.sys_log.error(f"Error sending Http Packet {str(payload)}")
self.history.append(WebBrowser.BrowserHistoryItem(url=url, outcome="SERVER_UNREACHABLE"))
self.history.append(
WebBrowser.BrowserHistoryItem(
url=url, status=self.BrowserHistoryItem._HistoryItemStatus.SERVER_UNREACHABLE
)
)
return False
def send(
@@ -190,13 +201,21 @@ class WebBrowser(Application):
url: str
"""The URL that was attempted to be fetched by the browser"""
outcome: Union[HttpStatusCode, Literal["PENDING", "SERVER_UNREACHABLE"]] = "PENDING"
class _HistoryItemStatus(Enum):
NOT_SENT = "NOT_SENT"
PENDING = "PENDING"
SERVER_UNREACHABLE = "SERVER_UNREACHABLE"
LOADED = "LOADED"
status: _HistoryItemStatus = _HistoryItemStatus.PENDING
response_code: Optional[HttpStatusCode] = None
"""HTTP response code that was received, or PENDING if a response was not yet received."""
def state(self) -> Dict:
"""Return the contents of this dataclass as a dict for use with describe_state method."""
if isinstance(self.outcome, HttpStatusCode):
outcome = self.outcome.value
if self.status == self._HistoryItemStatus.LOADED:
outcome = self.response_code
else:
outcome = self.outcome
outcome = self.status.value
return {"url": self.url, "outcome": outcome}

View File

@@ -3,6 +3,7 @@ from typing import Tuple
import pytest
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.base import Link
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.computer import Computer
@@ -18,7 +19,7 @@ from primaite.simulator.system.services.web_server.web_server import WebServer
@pytest.fixture(scope="function")
def web_client_web_server_database(example_network) -> Tuple[Computer, Server, Server]:
def web_client_web_server_database(example_network) -> Tuple[Network, Computer, Server, Server]:
# add rules to network router
router_1: Router = example_network.get_node_by_hostname("router_1")
router_1.acl.add_rule(
@@ -118,13 +119,17 @@ class TestWebBrowserHistory:
assert len(web_browser.history) == 1
web_browser.get_webpage()
assert len(web_browser.history) == 2
assert web_browser.history[-1].outcome == 200
assert web_browser.history[-1].status == WebBrowser.BrowserHistoryItem._HistoryItemStatus.LOADED
assert web_browser.history[-1].response_code == 200
router = network.get_node_by_hostname("router_1")
router.acl.add_rule(action=ACLAction.DENY, src_port=Port.HTTP, dst_port=Port.HTTP, position=0)
assert not web_browser.get_webpage()
assert len(web_browser.history) == 3
assert web_browser.history[-1].outcome == 404
# with current NIC behaviour, even if you block communication, you won't get SERVER_UNREACHABLE because
# application.send always returns true, even if communication fails. we should change what is returned from NICs
assert web_browser.history[-1].status == WebBrowser.BrowserHistoryItem._HistoryItemStatus.LOADED
assert web_browser.history[-1].response_code == 404
def test_history_in_state(self, web_client_web_server_database):
network, computer, _, _ = web_client_web_server_database