Describe state

This commit is contained in:
Marek Wolan
2023-08-17 15:32:12 +01:00
parent ced45d4275
commit 6ca53803cd
17 changed files with 444 additions and 125 deletions

View File

@@ -0,0 +1,140 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Build a simulation using the Python API\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Import the Simulation class"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.sim_container import Simulation\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create an empty simulation. By default this has a network with no nodes or links, and a domain controller with no accounts.\n",
"\n",
"Let's use the simulation's `describe_state()` method to verify that it is empty."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'uuid': '8d5cbb1b-aa9b-4f66-8b23-80d47755df69',\n",
" 'network': {'uuid': 'd3569bc4-eeed-40b1-9c3c-0fe80b9bb11c',\n",
" 'nodes': {},\n",
" 'links': {}},\n",
" 'domain': {'uuid': '4d4024ae-5948-4f07-aed9-d2315891cddc', 'accounts': {}}}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_sim = Simulation()\n",
"my_sim.describe_state()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add nodes"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.network.hardware.base import Node\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'uuid': '8d5cbb1b-aa9b-4f66-8b23-80d47755df69',\n",
" 'network': {'uuid': 'd3569bc4-eeed-40b1-9c3c-0fe80b9bb11c',\n",
" 'nodes': {'5f596f4f-4d34-4d1c-9688-9a105e489444': {'uuid': '5f596f4f-4d34-4d1c-9688-9a105e489444',\n",
" 'hostname': 'primaite_pc',\n",
" 'operating_state': 0,\n",
" 'NICs': {},\n",
" 'file_system': {'uuid': 'dc1e7032-7dba-44d5-aedb-5da75ab1eccc',\n",
" 'folders': {}},\n",
" 'applications': {},\n",
" 'services': {},\n",
" 'process': {}}},\n",
" 'links': {}},\n",
" 'domain': {'uuid': '4d4024ae-5948-4f07-aed9-d2315891cddc', 'accounts': {}}}"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_pc = Node(hostname=\"primaite_pc\",)\n",
"my_sim.network.nodes[my_pc.uuid] = my_pc\n",
"my_sim.describe_state()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -147,7 +147,10 @@ class SimComponent(BaseModel):
object. If there are objects referenced by this object that are owned by something else, it is not included in
this output.
"""
return {}
state = {
"uuid": self.uuid,
}
return state
def apply_action(self, action: List[str], context: Dict = {}) -> None:
"""

View File

@@ -43,8 +43,27 @@ class Account(SimComponent):
enabled: bool = True
def describe_state(self) -> Dict:
"""Describe state for agent observations."""
return super().describe_state()
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"num_logons": self.num_logons,
"num_logoffs": self.num_logoffs,
"num_group_changes": self.num_group_changes,
"username": self.username,
"password": self.password,
"account_type": self.account_type,
"enabled": self.enabled,
}
)
return state
def enable(self):
"""Set the status to enabled."""

View File

@@ -96,6 +96,19 @@ class DomainController(SimComponent):
),
)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update({"accounts": {uuid: acct.describe_state() for uuid, acct in self.accounts.items()}})
return state
def _register_account(self, account: Account) -> None:
"""TODO."""
...

View File

@@ -18,11 +18,16 @@ class FileSystem(SimComponent):
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystem as a dict.
Produce a dictionary describing the current state of this object.
:return: A dict containing the current state of the FileSystemFile.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update({"folders": {uuid: folder for uuid, folder in self.folders.items()}})
return state
def get_folders(self) -> Dict:
"""Returns the list of folders."""

View File

@@ -38,8 +38,16 @@ class FileSystemFile(FileSystemItem):
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystemFile as a dict.
Produce a dictionary describing the current state of this object.
:return: A dict containing the current state of the FileSystemFile.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
return {
"uuid": self.uuid,
"name": self.name,
"size": self.size,
"file_type": self.file_type,
}

View File

@@ -16,6 +16,23 @@ class FileSystemFolder(FileSystemItem):
is_quarantined: bool = False
"""Flag that marks the folder as quarantined if true."""
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
return {
"uuid": self.uuid,
"name": self.name,
"size": self.size,
"files": {uuid: file for uuid, file in self.files.items()},
"is_quarantined": self.is_quarantined,
}
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Return a FileSystemFile with the matching id."""
return self.files.get(file_id)
@@ -67,11 +84,3 @@ class FileSystemFolder(FileSystemItem):
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
return self.is_quarantined
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystemFolder as a dict.
:return: A dict containing the current state of the FileSystemFile.
"""
pass

View File

@@ -13,5 +13,19 @@ class FileSystemItem(SimComponent):
"""The size the item takes up on disk."""
def describe_state(self) -> Dict:
"""Returns the state of the FileSystemItem."""
pass
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"name": self.name,
"size": self.size,
}
)
return state

View File

@@ -21,3 +21,21 @@ class NetworkContainer(SimComponent):
validator=AllowAllValidator(),
),
)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"nodes": {uuid: node.describe_state() for uuid, node in self.nodes.items()},
"links": {uuid: link.describe_state() for uuid, link in self.links.items()},
}
)
return state

View File

@@ -125,6 +125,31 @@ class NIC(SimComponent):
_LOGGER.error(msg)
raise ValueError(msg)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"ip_adress": self.ip_address,
"subnet_mask": self.subnet_mask,
"gateway": self.gateway,
"mac_address": self.mac_address,
"speed": self.speed,
"mtu": self.mtu,
"wake_on_lan": self.wake_on_lan,
"dns_servers": self.dns_servers,
"enabled": self.enabled,
}
)
return state
@property
def ip_network(self) -> IPv4Network:
"""
@@ -241,23 +266,6 @@ class NIC(SimComponent):
return True
return False
def describe_state(self) -> Dict:
"""
Get the current state of the NIC as a dict.
:return: A dict containing the current state of the NIC.
"""
pass
def apply_action(self, action: str):
"""
Apply an action to the NIC.
:param action: The action to be applied.
:type action: str
"""
pass
def __str__(self) -> str:
return f"{self.mac_address}/{self.ip_address}"
@@ -293,6 +301,25 @@ class SwitchPort(SimComponent):
kwargs["mac_address"] = generate_mac_address()
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"mac_address": self.mac_address,
"speed": self.speed,
"mtu": self.mtu,
"enabled": self.enabled,
}
)
def enable(self):
"""Attempt to enable the SwitchPort."""
if self.enabled:
@@ -379,23 +406,6 @@ class SwitchPort(SimComponent):
return True
return False
def describe_state(self) -> Dict:
"""
Get the current state of the SwitchPort as a dict.
:return: A dict containing the current state of the SwitchPort.
"""
pass
def apply_action(self, action: str):
"""
Apply an action to the SwitchPort.
:param action: The action to be applied.
:type action: str
"""
pass
def __str__(self) -> str:
return f"{self.mac_address}"
@@ -435,6 +445,26 @@ class Link(SimComponent):
self.endpoint_b.connect_link(self)
self.endpoint_up()
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"endpoint_a": self.endpoint_a.uuid,
"endpoint_b": self.endpoint_b.uuid,
"bandwidth": self.bandwidth,
"current_load": self.current_load,
}
)
return state
@property
def current_load_percent(self) -> str:
"""Get the current load formatted as a percentage string."""
@@ -504,23 +534,6 @@ class Link(SimComponent):
"""
self.current_load = 0
def describe_state(self) -> Dict:
"""
Get the current state of the Link as a dict.
:return: A dict containing the current state of the Link.
"""
pass
def apply_action(self, action: str):
"""
Apply an action to the Link.
:param action: The action to be applied.
:type action: str
"""
pass
def __str__(self) -> str:
return f"{self.endpoint_a}<-->{self.endpoint_b}"
@@ -832,6 +845,30 @@ class Node(SimComponent):
super().__init__(**kwargs)
self.arp.nics = self.nics
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"hostname": self.hostname,
"operating_state": self.operating_state.value,
"NICs": {uuid: nic.describe_state() for uuid, nic in self.nics.items()},
# "switch_ports": {uuid, sp for uuid, sp in self.switch_ports.items()},
"file_system": self.file_system.describe_state(),
"applications": {uuid: app for uuid, app in self.applications.items()},
"services": {uuid: svc for uuid, svc in self.services.items()},
"process": {uuid: proc for uuid, proc in self.processes.items()},
}
)
return state
def show(self):
"""Prints a table of the NICs on the Node.."""
from prettytable import PrettyTable
@@ -950,14 +987,6 @@ class Node(SimComponent):
elif frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame)
def describe_state(self) -> Dict:
"""
Describe the state of the Node.
:return: A dictionary representing the state of the node.
"""
pass
class Switch(Node):
"""A class representing a Layer 2 network switch."""
@@ -966,9 +995,17 @@ class Switch(Node):
"The number of ports on the switch."
switch_ports: Dict[int, SwitchPort] = {}
"The SwitchPorts on the switch."
dst_mac_table: Dict[str, SwitchPort] = {}
mac_address_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.port_num = port_num
def show(self):
"""Prints a table of the SwitchPorts on the Switch."""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
@@ -978,25 +1015,29 @@ class Switch(Node):
print(table)
def describe_state(self) -> Dict:
"""TODO."""
pass
"""
Produce a dictionary describing the current state of this object.
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.port_num = port_num
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
return {
"uuid": self.uuid,
"num_ports": self.num_ports, # redundant?
"ports": {port_num: port for port_num, port in self.switch_ports.items()},
"mac_address_table": {mac: port for mac, port in self.mac_address_table.items()},
}
def _add_mac_table_entry(self, mac_address: str, switch_port: SwitchPort):
mac_table_port = self.dst_mac_table.get(mac_address)
mac_table_port = self.mac_address_table.get(mac_address)
if not mac_table_port:
self.dst_mac_table[mac_address] = switch_port
self.mac_address_table[mac_address] = switch_port
self.sys_log.info(f"Added MAC table entry: Port {switch_port.port_num} -> {mac_address}")
else:
if mac_table_port != switch_port:
self.dst_mac_table.pop(mac_address)
self.mac_address_table.pop(mac_address)
self.sys_log.info(f"Removed MAC table entry: Port {mac_table_port.port_num} -> {mac_address}")
self._add_mac_table_entry(mac_address, switch_port)
@@ -1011,7 +1052,7 @@ class Switch(Node):
dst_mac = frame.ethernet.dst_mac_addr
self._add_mac_table_entry(src_mac, incoming_port)
outgoing_port = self.dst_mac_table.get(dst_mac)
outgoing_port = self.mac_address_table.get(dst_mac)
if outgoing_port or dst_mac != "ff:ff:ff:ff:ff:ff":
outgoing_port.send_frame(frame)
else:

View File

@@ -1,20 +1,23 @@
from typing import Dict
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
from primaite.simulator.domain.controller import DomainController
from primaite.simulator.network.container import NetworkContainer
class __TempNetwork:
class Simulation(SimComponent):
"""TODO."""
pass
class SimulationContainer(SimComponent):
"""TODO."""
network: __TempNetwork
network: NetworkContainer
domain: DomainController
def __init__(self, **kwargs):
if not kwargs.get("network"):
kwargs["network"] = NetworkContainer()
if not kwargs.get("domain"):
kwargs["domain"] = DomainController()
super().__init__(**kwargs)
self.action_manager = ActionManager()
@@ -32,3 +35,21 @@ class SimulationContainer(SimComponent):
func=lambda request, context: self.domain.apply_action(request, context), validator=AllowAllValidator()
),
)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"network": self.network.describe_state(),
"domain": self.domain.describe_state(),
}
)
return state

View File

@@ -36,12 +36,11 @@ class Application(IOSoftware):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass

View File

@@ -51,9 +51,12 @@ class Session(SimComponent):
def describe_state(self) -> Dict:
"""
Describes the current state of the session as a dictionary.
Produce a dictionary describing the current state of this object.
:return: A dictionary containing the current state of the session.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
@@ -77,9 +80,12 @@ class SessionManager:
def describe_state(self) -> Dict:
"""
Describes the current state of the session manager as a dictionary.
Produce a dictionary describing the current state of this object.
:return: A dictionary containing the current state of the session manager.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass

View File

@@ -27,12 +27,11 @@ class Process(Software):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass

View File

@@ -35,12 +35,11 @@ class Service(IOSoftware):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass

View File

@@ -78,15 +78,25 @@ class Software(SimComponent):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"health_state": self.health_state_actual.name,
"health_state_red_view": self.health_state_visible.name,
"criticality": self.criticality.name,
"patching_count": self.patching_count,
"scanning_count": self.scanning_count,
"revealed_to_red": self.revealed_to_red,
}
)
return state
def apply_action(self, action: List[str]) -> None:
"""
@@ -134,12 +144,11 @@ class IOSoftware(Software):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass

View File

@@ -0,0 +1,16 @@
from primaite.simulator.sim_container import Simulation
def test_creating_empty_simulation():
"""Check that no errors occur when trying to setup a simulation without providing parameters"""
empty_sim = Simulation()
def test_empty_sim_state():
"""Check that describe_state has the right subcomponents."""
empty_sim = Simulation()
sim_state = empty_sim.describe_state()
network_state = empty_sim.network.describe_state()
domain_state = empty_sim.domain.describe_state()
assert sim_state["network"] == network_state
assert sim_state["domain"] == domain_state