Merged PR 169: Connect all components and add describe_state() methods

## Summary
- Add an object that holds the entire simulation, and a container for the network that keeps track of a list of nodes and links.
- Implement `describe_state()` for all existing sim components and take advantage of the inheritance relationships to avoid repetition.
- Fix some minor mistakes like typehints and indentation errors.
- Write a jupyter notebook which uses the python API to create a simulation and verify that it's `describe_state()` method outputs a correct value.
- Currently the notebook creates the simulation in a janky way, because the API for simulation creation is not fleshed out yet. Further tickets have been added to the backlog to address some of these shortcomings. They are:
  - #1790

## Test process
I have tested that the notebook runs and that after populating a simulation, the describe_state function returns a dictionary full of only serialisable data types.

## Checklist
- [y] This PR is linked to a **work item**
- [y] I have performed **self-review** of the code
- [~] I have written **tests** for any new functionality added with this PR
- [n] I have updated the **documentation** if this PR changes or adds functionality
- [na] I have written/updated **design docs** if this PR implements new functionality
- [y] I have update the **change log**
- [y] I have run **pre-commit** checks for code style

Note:
This ticket also makes a small amount of progress against: #1705, it adds a shell of a network class, but only by creating the class, not implementing any functionality.

Related work items: #1787
This commit is contained in:
Marek Wolan
2023-08-23 15:24:17 +00:00
20 changed files with 849 additions and 138 deletions

View File

@@ -17,6 +17,8 @@ a Service/Application another machine.
SessionManager.
- Permission System - each action can define criteria that will be used to permit or deny agent actions.
- File System - ability to emulate a node's file system during a simulation
- Example notebooks - There is currently 1 jupyter notebook which walks through using PrimAITE
1. Creating a simulation - this notebook explains how to build up a simulation using the Python package. (WIP)
## [2.0.0] - 2023-07-26

BIN
docs/_static/component_relationship.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

@@ -7,11 +7,19 @@ Simulation Structure
====================
The simulation is made up of many smaller components which are related to each other in a tree-like structure. At the
top level, there is an object called the ``SimulationController`` _(doesn't exist yet)_, which has a physical network
and a software controller for managing software and users.
top level, there is the :py:meth:`primaite.simulator.sim_container.Simulation`, which keeps track of the physical network
and a domain controller for managing software and users.
Each node of the simulation 'tree' has responsibility for creating, deleting, and updating its direct descendants.
Each node of the simulation 'tree' has responsibility for creating, deleting, and updating its direct descendants. Also,
when a component's ``describe_state()`` method is called, it will include the state of its descendants. The
``apply_action()`` method can be used to act on a component or one of its descendatnts. The diagram below shows the
relationship between components.
.. image:: _static/component_relationship.png
:width: 500
:alt: The top level simulation object owns a NetworkContainer and a DomainController. The DomainController has a
list of accounts. The network container has links and nodes. Nodes can own switchports, NICs, FileSystem,
Application, Service, and Process.
Actions

View File

@@ -0,0 +1,419 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Build a simulation using the Python API\n",
"\n",
"Currently, this notbook manipulates the simulation by directly placing objects inside of the attributes of the network and domain. It should be refactored when proper methods exist for adding these objects.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Import the Simulation class"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.sim_container import Simulation\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create an empty simulation. By default this has a network with no nodes or links, and a domain controller with no accounts.\n",
"\n",
"Let's use the simulation's `describe_state()` method to verify that it is empty."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'uuid': '5304ed6d-de4c-408c-ae24-ada32852d196',\n",
" 'network': {'uuid': 'fa17dfe8-81a1-4c7f-8c5b-8c2d3b1e8756',\n",
" 'nodes': {},\n",
" 'links': {}},\n",
" 'domain': {'uuid': '320cbb83-eb1b-4911-a4f0-fc46d8038a8a', 'accounts': {}}}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_sim = Simulation()\n",
"my_sim.describe_state()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add nodes"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.network.hardware.base import Node\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"my_pc = Node(hostname=\"primaite_pc\",)\n",
"my_server = Node(hostname=\"google_server\")\n",
"\n",
"# TODO: when there is a proper function for adding nodes, use it instead of manually adding.\n",
"\n",
"my_sim.network.nodes[my_pc.uuid] = my_pc\n",
"my_sim.network.nodes[my_server.uuid] = my_server\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connect the nodes"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.network.hardware.base import NIC, Link, Switch\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-08-20 18:42:51,310: NIC 5c:b6:26:c0:86:61/130.1.1.1 connected to Link 5c:b6:26:c0:86:61/130.1.1.1<-->01:ef:b1:a3:24:72\n",
"2023-08-20 18:42:51,311: SwitchPort 01:ef:b1:a3:24:72 connected to Link 5c:b6:26:c0:86:61/130.1.1.1<-->01:ef:b1:a3:24:72\n",
"2023-08-20 18:42:51,314: NIC f6:de:1e:63:8e:7f/130.1.1.2 connected to Link f6:de:1e:63:8e:7f/130.1.1.2<-->30:9e:c8:d4:5d:f3\n",
"2023-08-20 18:42:51,315: SwitchPort 30:9e:c8:d4:5d:f3 connected to Link f6:de:1e:63:8e:7f/130.1.1.2<-->30:9e:c8:d4:5d:f3\n"
]
}
],
"source": [
"my_swtich = Switch(hostname=\"switch1\", num_ports=12)\n",
"\n",
"pc_nic = NIC(ip_address=\"130.1.1.1\", gateway=\"130.1.1.255\", subnet_mask=\"255.255.255.0\")\n",
"my_pc.connect_nic(pc_nic)\n",
"\n",
"\n",
"server_nic = NIC(ip_address=\"130.1.1.2\", gateway=\"130.1.1.255\", subnet_mask=\"255.255.255.0\")\n",
"my_server.connect_nic(server_nic)\n",
"\n",
"\n",
"pc_to_switch = Link(endpoint_a=pc_nic, endpoint_b=my_swtich.switch_ports[1])\n",
"server_to_swtich = Link(endpoint_a=server_nic, endpoint_b=my_swtich.switch_ports[2])\n",
"\n",
"my_sim.network.links[pc_to_switch.uuid] = pc_to_switch\n",
"my_sim.network.links[server_to_swtich.uuid] = server_to_swtich"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add files and folders to nodes\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.file_system.file_system_file_type import FileSystemFileType\n",
"from primaite.simulator.file_system.file_system_file import FileSystemFile"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"my_pc_downloads_folder = my_pc.file_system.create_folder(\"downloads\")\n",
"my_pc_downloads_folder.add_file(FileSystemFile(name=\"firefox_installer.zip\",file_type=FileSystemFileType.ZIP))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"FileSystemFile(uuid='253e4606-0f6d-4e57-8db0-6fa7e331ecea', name='favicon.ico', size=40.0, file_type=<FileSystemFileType.PNG: '11'>, action_manager=None)"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_server_folder = my_server.file_system.create_folder(\"static\")\n",
"my_server.file_system.create_file(\"favicon.ico\", file_type=FileSystemFileType.PNG)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add applications to nodes"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.system.applications.application import Application, ApplicationOperatingState\n",
"from primaite.simulator.system.software import SoftwareHealthState, SoftwareCriticality\n",
"from primaite.simulator.network.transmission.transport_layer import Port\n",
"\n",
"# no applications exist yet so we will create our own.\n",
"class MSPaint(Application):\n",
" def describe_state(self):\n",
" return super().describe_state()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"mspaint = MSPaint(name = \"mspaint\", health_state_actual=SoftwareHealthState.GOOD, health_state_visible=SoftwareHealthState.GOOD, criticality=SoftwareCriticality.MEDIUM, ports={Port.HTTP}, operating_state=ApplicationOperatingState.RUNNING,execution_control_status='manual')"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"my_pc.applications[mspaint.uuid] = mspaint"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a domain account"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from primaite.simulator.domain.account import Account, AccountType\n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"acct = Account(username=\"admin\", password=\"admin12\", account_type=AccountType.USER)\n",
"my_sim.domain.accounts[acct.uuid] = acct"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Verify that the state dictionary contains no non-serialisable objects."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'uuid': '5304ed6d-de4c-408c-ae24-ada32852d196',\n",
" 'network': {'uuid': 'fa17dfe8-81a1-4c7f-8c5b-8c2d3b1e8756',\n",
" 'nodes': {'1fa46446-6681-4e25-a3ba-c4c2cc564630': {'uuid': '1fa46446-6681-4e25-a3ba-c4c2cc564630',\n",
" 'hostname': 'primaite_pc',\n",
" 'operating_state': 0,\n",
" 'NICs': {'09ca02eb-7733-492c-9eff-f0d6b6ebeeda': {'uuid': '09ca02eb-7733-492c-9eff-f0d6b6ebeeda',\n",
" 'ip_adress': '130.1.1.1',\n",
" 'subnet_mask': '255.255.255.0',\n",
" 'gateway': '130.1.1.255',\n",
" 'mac_address': '5c:b6:26:c0:86:61',\n",
" 'speed': 100,\n",
" 'mtu': 1500,\n",
" 'wake_on_lan': False,\n",
" 'dns_servers': [],\n",
" 'enabled': False}},\n",
" 'file_system': {'uuid': '8b533e31-04e9-4838-839d-0656ace3e57a',\n",
" 'folders': {'b450c223-872c-4fe0-90cc-9da80973eaad': {'uuid': 'b450c223-872c-4fe0-90cc-9da80973eaad',\n",
" 'name': 'downloads',\n",
" 'size': 1000.0,\n",
" 'files': {'8160e685-a76f-4171-8a12-3d6b32a9ea16': {'uuid': '8160e685-a76f-4171-8a12-3d6b32a9ea16',\n",
" 'name': 'firefox_installer.zip',\n",
" 'size': 1000.0,\n",
" 'file_type': 'ZIP'}},\n",
" 'is_quarantined': False}}},\n",
" 'applications': {'c82f1064-f35e-466b-88ae-3f61ba0e5161': {'uuid': 'c82f1064-f35e-466b-88ae-3f61ba0e5161',\n",
" 'health_state': 'GOOD',\n",
" 'health_state_red_view': 'GOOD',\n",
" 'criticality': 'MEDIUM',\n",
" 'patching_count': 0,\n",
" 'scanning_count': 0,\n",
" 'revealed_to_red': False,\n",
" 'installing_count': 0,\n",
" 'max_sessions': 1,\n",
" 'tcp': True,\n",
" 'udp': True,\n",
" 'ports': ['HTTP'],\n",
" 'opearting_state': 'RUNNING',\n",
" 'execution_control_status': 'manual',\n",
" 'num_executions': 0,\n",
" 'groups': []}},\n",
" 'services': {},\n",
" 'process': {}},\n",
" '7f637689-6f91-4026-a685-48a9067f03e8': {'uuid': '7f637689-6f91-4026-a685-48a9067f03e8',\n",
" 'hostname': 'google_server',\n",
" 'operating_state': 0,\n",
" 'NICs': {'1abc7272-c516-4463-bd07-1a3cefe39313': {'uuid': '1abc7272-c516-4463-bd07-1a3cefe39313',\n",
" 'ip_adress': '130.1.1.2',\n",
" 'subnet_mask': '255.255.255.0',\n",
" 'gateway': '130.1.1.255',\n",
" 'mac_address': 'f6:de:1e:63:8e:7f',\n",
" 'speed': 100,\n",
" 'mtu': 1500,\n",
" 'wake_on_lan': False,\n",
" 'dns_servers': [],\n",
" 'enabled': False}},\n",
" 'file_system': {'uuid': 'ac9a6643-8349-4f7a-98c7-a1a9f97ce123',\n",
" 'folders': {'befa5d92-0878-4da2-9dac-f993c0b4a554': {'uuid': 'befa5d92-0878-4da2-9dac-f993c0b4a554',\n",
" 'name': 'static',\n",
" 'size': 0,\n",
" 'files': {},\n",
" 'is_quarantined': False},\n",
" '27383b5e-8884-4ec0-bb50-a5d43e460dfa': {'uuid': '27383b5e-8884-4ec0-bb50-a5d43e460dfa',\n",
" 'name': 'root',\n",
" 'size': 40.0,\n",
" 'files': {'253e4606-0f6d-4e57-8db0-6fa7e331ecea': {'uuid': '253e4606-0f6d-4e57-8db0-6fa7e331ecea',\n",
" 'name': 'favicon.ico',\n",
" 'size': 40.0,\n",
" 'file_type': 'PNG'}},\n",
" 'is_quarantined': False}}},\n",
" 'applications': {},\n",
" 'services': {},\n",
" 'process': {}}},\n",
" 'links': {'a449b1ff-50d9-4342-861e-44f2d4dfef37': {'uuid': 'a449b1ff-50d9-4342-861e-44f2d4dfef37',\n",
" 'endpoint_a': '09ca02eb-7733-492c-9eff-f0d6b6ebeeda',\n",
" 'endpoint_b': 'ee4557d9-a309-45dd-a6e0-5b572cc70ee5',\n",
" 'bandwidth': 100.0,\n",
" 'current_load': 0.0},\n",
" 'ebd7687b-ec69-4f1b-b2ba-86669aa95723': {'uuid': 'ebd7687b-ec69-4f1b-b2ba-86669aa95723',\n",
" 'endpoint_a': '1abc7272-c516-4463-bd07-1a3cefe39313',\n",
" 'endpoint_b': 'dc26b764-a07e-486a-99a4-798c8e0c187a',\n",
" 'bandwidth': 100.0,\n",
" 'current_load': 0.0}}},\n",
" 'domain': {'uuid': '320cbb83-eb1b-4911-a4f0-fc46d8038a8a',\n",
" 'accounts': {'5fdcfb66-84f3-4f0f-a3a7-d0cb0e1a5d51': {'uuid': '5fdcfb66-84f3-4f0f-a3a7-d0cb0e1a5d51',\n",
" 'num_logons': 0,\n",
" 'num_logoffs': 0,\n",
" 'num_group_changes': 0,\n",
" 'username': 'admin',\n",
" 'password': 'admin12',\n",
" 'account_type': 'USER',\n",
" 'enabled': True}}}}"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_sim.describe_state()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'{\"uuid\": \"5304ed6d-de4c-408c-ae24-ada32852d196\", \"network\": {\"uuid\": \"fa17dfe8-81a1-4c7f-8c5b-8c2d3b1e8756\", \"nodes\": {\"1fa46446-6681-4e25-a3ba-c4c2cc564630\": {\"uuid\": \"1fa46446-6681-4e25-a3ba-c4c2cc564630\", \"hostname\": \"primaite_pc\", \"operating_state\": 0, \"NICs\": {\"09ca02eb-7733-492c-9eff-f0d6b6ebeeda\": {\"uuid\": \"09ca02eb-7733-492c-9eff-f0d6b6ebeeda\", \"ip_adress\": \"130.1.1.1\", \"subnet_mask\": \"255.255.255.0\", \"gateway\": \"130.1.1.255\", \"mac_address\": \"5c:b6:26:c0:86:61\", \"speed\": 100, \"mtu\": 1500, \"wake_on_lan\": false, \"dns_servers\": [], \"enabled\": false}}, \"file_system\": {\"uuid\": \"8b533e31-04e9-4838-839d-0656ace3e57a\", \"folders\": {\"b450c223-872c-4fe0-90cc-9da80973eaad\": {\"uuid\": \"b450c223-872c-4fe0-90cc-9da80973eaad\", \"name\": \"downloads\", \"size\": 1000.0, \"files\": {\"8160e685-a76f-4171-8a12-3d6b32a9ea16\": {\"uuid\": \"8160e685-a76f-4171-8a12-3d6b32a9ea16\", \"name\": \"firefox_installer.zip\", \"size\": 1000.0, \"file_type\": \"ZIP\"}}, \"is_quarantined\": false}}}, \"applications\": {\"c82f1064-f35e-466b-88ae-3f61ba0e5161\": {\"uuid\": \"c82f1064-f35e-466b-88ae-3f61ba0e5161\", \"health_state\": \"GOOD\", \"health_state_red_view\": \"GOOD\", \"criticality\": \"MEDIUM\", \"patching_count\": 0, \"scanning_count\": 0, \"revealed_to_red\": false, \"installing_count\": 0, \"max_sessions\": 1, \"tcp\": true, \"udp\": true, \"ports\": [\"HTTP\"], \"opearting_state\": \"RUNNING\", \"execution_control_status\": \"manual\", \"num_executions\": 0, \"groups\": []}}, \"services\": {}, \"process\": {}}, \"7f637689-6f91-4026-a685-48a9067f03e8\": {\"uuid\": \"7f637689-6f91-4026-a685-48a9067f03e8\", \"hostname\": \"google_server\", \"operating_state\": 0, \"NICs\": {\"1abc7272-c516-4463-bd07-1a3cefe39313\": {\"uuid\": \"1abc7272-c516-4463-bd07-1a3cefe39313\", \"ip_adress\": \"130.1.1.2\", \"subnet_mask\": \"255.255.255.0\", \"gateway\": \"130.1.1.255\", \"mac_address\": \"f6:de:1e:63:8e:7f\", \"speed\": 100, \"mtu\": 1500, \"wake_on_lan\": false, \"dns_servers\": [], \"enabled\": false}}, \"file_system\": {\"uuid\": \"ac9a6643-8349-4f7a-98c7-a1a9f97ce123\", \"folders\": {\"befa5d92-0878-4da2-9dac-f993c0b4a554\": {\"uuid\": \"befa5d92-0878-4da2-9dac-f993c0b4a554\", \"name\": \"static\", \"size\": 0, \"files\": {}, \"is_quarantined\": false}, \"27383b5e-8884-4ec0-bb50-a5d43e460dfa\": {\"uuid\": \"27383b5e-8884-4ec0-bb50-a5d43e460dfa\", \"name\": \"root\", \"size\": 40.0, \"files\": {\"253e4606-0f6d-4e57-8db0-6fa7e331ecea\": {\"uuid\": \"253e4606-0f6d-4e57-8db0-6fa7e331ecea\", \"name\": \"favicon.ico\", \"size\": 40.0, \"file_type\": \"PNG\"}}, \"is_quarantined\": false}}}, \"applications\": {}, \"services\": {}, \"process\": {}}}, \"links\": {\"a449b1ff-50d9-4342-861e-44f2d4dfef37\": {\"uuid\": \"a449b1ff-50d9-4342-861e-44f2d4dfef37\", \"endpoint_a\": \"09ca02eb-7733-492c-9eff-f0d6b6ebeeda\", \"endpoint_b\": \"ee4557d9-a309-45dd-a6e0-5b572cc70ee5\", \"bandwidth\": 100.0, \"current_load\": 0.0}, \"ebd7687b-ec69-4f1b-b2ba-86669aa95723\": {\"uuid\": \"ebd7687b-ec69-4f1b-b2ba-86669aa95723\", \"endpoint_a\": \"1abc7272-c516-4463-bd07-1a3cefe39313\", \"endpoint_b\": \"dc26b764-a07e-486a-99a4-798c8e0c187a\", \"bandwidth\": 100.0, \"current_load\": 0.0}}}, \"domain\": {\"uuid\": \"320cbb83-eb1b-4911-a4f0-fc46d8038a8a\", \"accounts\": {\"5fdcfb66-84f3-4f0f-a3a7-d0cb0e1a5d51\": {\"uuid\": \"5fdcfb66-84f3-4f0f-a3a7-d0cb0e1a5d51\", \"num_logons\": 0, \"num_logoffs\": 0, \"num_group_changes\": 0, \"username\": \"admin\", \"password\": \"admin12\", \"account_type\": \"USER\", \"enabled\": true}}}}'"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import json\n",
"json.dumps(my_sim.describe_state())"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -147,7 +147,10 @@ class SimComponent(BaseModel):
object. If there are objects referenced by this object that are owned by something else, it is not included in
this output.
"""
return {}
state = {
"uuid": self.uuid,
}
return state
def apply_action(self, action: List[str], context: Dict = {}) -> None:
"""

View File

@@ -43,8 +43,27 @@ class Account(SimComponent):
enabled: bool = True
def describe_state(self) -> Dict:
"""Describe state for agent observations."""
return super().describe_state()
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"num_logons": self.num_logons,
"num_logoffs": self.num_logoffs,
"num_group_changes": self.num_group_changes,
"username": self.username,
"password": self.password,
"account_type": self.account_type.name,
"enabled": self.enabled,
}
)
return state
def enable(self):
"""Set the status to enabled."""

View File

@@ -1,7 +1,7 @@
from enum import Enum
from typing import Dict, Final, List, Literal, Tuple
from primaite.simulator.core import ActionPermissionValidator, SimComponent
from primaite.simulator.core import Action, ActionManager, ActionPermissionValidator, SimComponent
from primaite.simulator.domain.account import Account, AccountType
@@ -82,6 +82,33 @@ class DomainController(SimComponent):
folders: List[temp_folder] = {}
files: List[temp_file] = {}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.action_manager = ActionManager()
# Action 'account' matches requests like:
# ['account', '<account-uuid>', *account_action]
self.action_manager.add_action(
"account",
Action(
func=lambda request, context: self.accounts[request.pop(0)].apply_action(request, context),
validator=GroupMembershipValidator([AccountGroup.DOMAIN_ADMIN]),
),
)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update({"accounts": {uuid: acct.describe_state() for uuid, acct in self.accounts.items()}})
return state
def _register_account(self, account: Account) -> None:
"""TODO."""
...

View File

@@ -13,16 +13,21 @@ _LOGGER = getLogger(__name__)
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
folders: Dict = {}
folders: Dict[str, FileSystemFolder] = {}
"""List containing all the folders in the file system."""
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystem as a dict.
Produce a dictionary describing the current state of this object.
:return: A dict containing the current state of the FileSystemFile.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update({"folders": {uuid: folder.describe_state() for uuid, folder in self.folders.items()}})
return state
def get_folders(self) -> Dict:
"""Returns the list of folders."""

View File

@@ -38,8 +38,18 @@ class FileSystemFile(FileSystemItem):
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystemFile as a dict.
Produce a dictionary describing the current state of this object.
:return: A dict containing the current state of the FileSystemFile.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"uuid": self.uuid,
"file_type": self.file_type.name,
}
)
return state

View File

@@ -10,12 +10,30 @@ _LOGGER = getLogger(__name__)
class FileSystemFolder(FileSystemItem):
"""Simulation FileSystemFolder."""
files: Dict = {}
files: Dict[str, FileSystemFile] = {}
"""List of files stored in the folder."""
is_quarantined: bool = False
"""Flag that marks the folder as quarantined if true."""
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"files": {uuid: file.describe_state() for uuid, file in self.files.items()},
"is_quarantined": self.is_quarantined,
}
)
return state
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Return a FileSystemFile with the matching id."""
return self.files.get(file_id)
@@ -67,11 +85,3 @@ class FileSystemFolder(FileSystemItem):
def quarantine_status(self) -> bool:
"""Returns true if the folder is being quarantined."""
return self.is_quarantined
def describe_state(self) -> Dict:
"""
Get the current state of the FileSystemFolder as a dict.
:return: A dict containing the current state of the FileSystemFile.
"""
pass

View File

@@ -13,5 +13,19 @@ class FileSystemItem(SimComponent):
"""The size the item takes up on disk."""
def describe_state(self) -> Dict:
"""Returns the state of the FileSystemItem."""
pass
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"name": self.name,
"size": self.size,
}
)
return state

View File

@@ -0,0 +1,42 @@
from typing import Dict
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
from primaite.simulator.network.hardware.base import Link, Node
class NetworkContainer(SimComponent):
"""Top level container object representing the physical network."""
nodes: Dict[str, Node] = {}
links: Dict[str, Link] = {}
def __init__(self, **kwargs):
"""Initialise the network."""
super().__init__(**kwargs)
self.action_manager = ActionManager()
self.action_manager.add_action(
"node",
Action(
func=lambda request, context: self.nodes[request.pop(0)].apply_action(request, context),
validator=AllowAllValidator(),
),
)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"nodes": {uuid: node.describe_state() for uuid, node in self.nodes.items()},
"links": {uuid: link.describe_state() for uuid, link in self.links.items()},
}
)
return state

View File

@@ -11,15 +11,19 @@ from prettytable import PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
from primaite.simulator.core import SimComponent
from primaite.simulator.domain.account import Account
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.packet_capture import PacketCapture
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.processes.process import Process
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
@@ -125,6 +129,31 @@ class NIC(SimComponent):
_LOGGER.error(msg)
raise ValueError(msg)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"ip_adress": str(self.ip_address),
"subnet_mask": str(self.subnet_mask),
"gateway": str(self.gateway),
"mac_address": self.mac_address,
"speed": self.speed,
"mtu": self.mtu,
"wake_on_lan": self.wake_on_lan,
"dns_servers": self.dns_servers,
"enabled": self.enabled,
}
)
return state
@property
def ip_network(self) -> IPv4Network:
"""
@@ -241,23 +270,6 @@ class NIC(SimComponent):
return True
return False
def describe_state(self) -> Dict:
"""
Get the current state of the NIC as a dict.
:return: A dict containing the current state of the NIC.
"""
pass
def apply_action(self, action: str):
"""
Apply an action to the NIC.
:param action: The action to be applied.
:type action: str
"""
pass
def __str__(self) -> str:
return f"{self.mac_address}/{self.ip_address}"
@@ -293,6 +305,26 @@ class SwitchPort(SimComponent):
kwargs["mac_address"] = generate_mac_address()
super().__init__(**kwargs)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"mac_address": self.mac_address,
"speed": self.speed,
"mtu": self.mtu,
"enabled": self.enabled,
}
)
return state
def enable(self):
"""Attempt to enable the SwitchPort."""
if self.enabled:
@@ -379,23 +411,6 @@ class SwitchPort(SimComponent):
return True
return False
def describe_state(self) -> Dict:
"""
Get the current state of the SwitchPort as a dict.
:return: A dict containing the current state of the SwitchPort.
"""
pass
def apply_action(self, action: str):
"""
Apply an action to the SwitchPort.
:param action: The action to be applied.
:type action: str
"""
pass
def __str__(self) -> str:
return f"{self.mac_address}"
@@ -435,6 +450,26 @@ class Link(SimComponent):
self.endpoint_b.connect_link(self)
self.endpoint_up()
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"endpoint_a": self.endpoint_a.uuid,
"endpoint_b": self.endpoint_b.uuid,
"bandwidth": self.bandwidth,
"current_load": self.current_load,
}
)
return state
@property
def current_load_percent(self) -> str:
"""Get the current load formatted as a percentage string."""
@@ -504,23 +539,6 @@ class Link(SimComponent):
"""
self.current_load = 0
def describe_state(self) -> Dict:
"""
Get the current state of the Link as a dict.
:return: A dict containing the current state of the Link.
"""
pass
def apply_action(self, action: str):
"""
Apply an action to the Link.
:param action: The action to be applied.
:type action: str
"""
pass
def __str__(self) -> str:
return f"{self.endpoint_a}<-->{self.endpoint_b}"
@@ -789,13 +807,13 @@ class Node(SimComponent):
nics: Dict[str, NIC] = {}
"The NICs on the node."
accounts: Dict = {}
accounts: Dict[str, Account] = {}
"All accounts on the node."
applications: Dict = {}
applications: Dict[str, Application] = {}
"All applications on the node."
services: Dict = {}
services: Dict[str, Service] = {}
"All services on the node."
processes: Dict = {}
processes: Dict[str, Process] = {}
"All processes on the node."
file_system: FileSystem
"The nodes file system."
@@ -832,6 +850,30 @@ class Node(SimComponent):
super().__init__(**kwargs)
self.arp.nics = self.nics
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"hostname": self.hostname,
"operating_state": self.operating_state.value,
"NICs": {uuid: nic.describe_state() for uuid, nic in self.nics.items()},
# "switch_ports": {uuid, sp for uuid, sp in self.switch_ports.items()},
"file_system": self.file_system.describe_state(),
"applications": {uuid: app.describe_state() for uuid, app in self.applications.items()},
"services": {uuid: svc.describe_state() for uuid, svc in self.services.items()},
"process": {uuid: proc.describe_state() for uuid, proc in self.processes.items()},
}
)
return state
def show(self):
"""Prints a table of the NICs on the Node.."""
from prettytable import PrettyTable
@@ -950,14 +992,6 @@ class Node(SimComponent):
elif frame.ip.protocol == IPProtocol.ICMP:
self.icmp.process_icmp(frame=frame)
def describe_state(self) -> Dict:
"""
Describe the state of the Node.
:return: A dictionary representing the state of the node.
"""
pass
class Switch(Node):
"""A class representing a Layer 2 network switch."""
@@ -966,9 +1000,17 @@ class Switch(Node):
"The number of ports on the switch."
switch_ports: Dict[int, SwitchPort] = {}
"The SwitchPorts on the switch."
dst_mac_table: Dict[str, SwitchPort] = {}
mac_address_table: Dict[str, SwitchPort] = {}
"A MAC address table mapping destination MAC addresses to corresponding SwitchPorts."
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.port_num = port_num
def show(self):
"""Prints a table of the SwitchPorts on the Switch."""
table = PrettyTable(["Port", "MAC Address", "Speed", "Status"])
@@ -978,25 +1020,29 @@ class Switch(Node):
print(table)
def describe_state(self) -> Dict:
"""TODO."""
pass
"""
Produce a dictionary describing the current state of this object.
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.switch_ports:
self.switch_ports = {i: SwitchPort() for i in range(1, self.num_ports + 1)}
for port_num, port in self.switch_ports.items():
port.connected_node = self
port.port_num = port_num
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
return {
"uuid": self.uuid,
"num_ports": self.num_ports, # redundant?
"ports": {port_num: port.describe_state() for port_num, port in self.switch_ports.items()},
"mac_address_table": {mac: port for mac, port in self.mac_address_table.items()},
}
def _add_mac_table_entry(self, mac_address: str, switch_port: SwitchPort):
mac_table_port = self.dst_mac_table.get(mac_address)
mac_table_port = self.mac_address_table.get(mac_address)
if not mac_table_port:
self.dst_mac_table[mac_address] = switch_port
self.mac_address_table[mac_address] = switch_port
self.sys_log.info(f"Added MAC table entry: Port {switch_port.port_num} -> {mac_address}")
else:
if mac_table_port != switch_port:
self.dst_mac_table.pop(mac_address)
self.mac_address_table.pop(mac_address)
self.sys_log.info(f"Removed MAC table entry: Port {mac_table_port.port_num} -> {mac_address}")
self._add_mac_table_entry(mac_address, switch_port)
@@ -1011,7 +1057,7 @@ class Switch(Node):
dst_mac = frame.ethernet.dst_mac_addr
self._add_mac_table_entry(src_mac, incoming_port)
outgoing_port = self.dst_mac_table.get(dst_mac)
outgoing_port = self.mac_address_table.get(dst_mac)
if outgoing_port or dst_mac != "ff:ff:ff:ff:ff:ff":
outgoing_port.send_frame(frame)
else:

View File

@@ -0,0 +1,56 @@
from typing import Dict
from primaite.simulator.core import Action, ActionManager, AllowAllValidator, SimComponent
from primaite.simulator.domain.controller import DomainController
from primaite.simulator.network.container import NetworkContainer
class Simulation(SimComponent):
"""Top-level simulation object which holds a reference to all other parts of the simulation."""
network: NetworkContainer
domain: DomainController
def __init__(self, **kwargs):
"""Initialise the Simulation."""
if not kwargs.get("network"):
kwargs["network"] = NetworkContainer()
if not kwargs.get("domain"):
kwargs["domain"] = DomainController()
super().__init__(**kwargs)
self.action_manager = ActionManager()
# pass through network actions to the network objects
self.action_manager.add_action(
"network",
Action(
func=lambda request, context: self.network.apply_action(request, context), validator=AllowAllValidator()
),
)
# pass through domain actions to the domain object
self.action_manager.add_action(
"domain",
Action(
func=lambda request, context: self.domain.apply_action(request, context), validator=AllowAllValidator()
),
)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
state = super().describe_state()
state.update(
{
"network": self.network.describe_state(),
"domain": self.domain.describe_state(),
}
)
return state

View File

@@ -8,13 +8,12 @@ from primaite.simulator.system.software import IOSoftware
class ApplicationOperatingState(Enum):
"""Enumeration of Application Operating States."""
RUNNING = 1
"The application is running."
CLOSED = 2
"The application is closed or not running."
INSTALLING = 3
"The application is being installed or updated."
RUNNING = 1
"The application is running."
CLOSED = 2
"The application is closed or not running."
INSTALLING = 3
"The application is being installed or updated."
class Application(IOSoftware):
@@ -36,15 +35,23 @@ class Application(IOSoftware):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"opearting_state": self.operating_state.name,
"execution_control_status": self.execution_control_status,
"num_executions": self.num_executions,
"groups": list(self.groups),
}
)
return state
def apply_action(self, action: List[str]) -> None:
"""

View File

@@ -51,9 +51,12 @@ class Session(SimComponent):
def describe_state(self) -> Dict:
"""
Describes the current state of the session as a dictionary.
Produce a dictionary describing the current state of this object.
:return: A dictionary containing the current state of the session.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
@@ -77,9 +80,12 @@ class SessionManager:
def describe_state(self) -> Dict:
"""
Describes the current state of the session manager as a dictionary.
Produce a dictionary describing the current state of this object.
:return: A dictionary containing the current state of the session manager.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass

View File

@@ -27,12 +27,13 @@ class Process(Software):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update({"operating_state": self.operating_state.name})
return state

View File

@@ -35,15 +35,16 @@ class Service(IOSoftware):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update({"operating_state": self.operating_state.name})
return state
def apply_action(self, action: List[str]) -> None:
"""

View File

@@ -78,15 +78,25 @@ class Software(SimComponent):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"health_state": self.health_state_actual.name,
"health_state_red_view": self.health_state_visible.name,
"criticality": self.criticality.name,
"patching_count": self.patching_count,
"scanning_count": self.scanning_count,
"revealed_to_red": self.revealed_to_red,
}
)
return state
def apply_action(self, action: List[str]) -> None:
"""
@@ -134,15 +144,24 @@ class IOSoftware(Software):
@abstractmethod
def describe_state(self) -> Dict:
"""
Describes the current state of the software.
Produce a dictionary describing the current state of this object.
The specifics of the software's state, including its health, criticality,
and any other pertinent information, should be implemented in subclasses.
Please see :py:meth:`primaite.simulator.core.SimComponent.describe_state` for a more detailed explanation.
:return: A dictionary containing key-value pairs representing the current state of the software.
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"installing_count": self.installing_count,
"max_sessions": self.max_sessions,
"tcp": self.tcp,
"udp": self.udp,
"ports": [port.name for port in self.ports], # TODO: not sure if this should be port.name or port.value
}
)
return state
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
"""

View File

@@ -0,0 +1,16 @@
from primaite.simulator.sim_container import Simulation
def test_creating_empty_simulation():
"""Check that no errors occur when trying to setup a simulation without providing parameters"""
empty_sim = Simulation()
def test_empty_sim_state():
"""Check that describe_state has the right subcomponents."""
empty_sim = Simulation()
sim_state = empty_sim.describe_state()
network_state = empty_sim.network.describe_state()
domain_state = empty_sim.domain.describe_state()
assert sim_state["network"] == network_state
assert sim_state["domain"] == domain_state