fix type hints and describe state functions

This commit is contained in:
Marek Wolan
2023-08-20 18:38:02 +01:00
parent 6ca53803cd
commit 01c912c094
10 changed files with 459 additions and 49 deletions

View File

@@ -59,7 +59,7 @@ class Account(SimComponent):
"num_group_changes": self.num_group_changes,
"username": self.username,
"password": self.password,
"account_type": self.account_type,
"account_type": self.account_type.name,
"enabled": self.enabled,
}
)

View File

@@ -13,7 +13,7 @@ _LOGGER = getLogger(__name__)
class FileSystem(SimComponent):
"""Class that contains all the simulation File System."""
folders: Dict = {}
folders: Dict[str, FileSystemFolder] = {}
"""List containing all the folders in the file system."""
def describe_state(self) -> Dict:
@@ -26,7 +26,7 @@ class FileSystem(SimComponent):
:rtype: Dict
"""
state = super().describe_state()
state.update({"folders": {uuid: folder for uuid, folder in self.folders.items()}})
state.update({"folders": {uuid: folder.describe_state() for uuid, folder in self.folders.items()}})
return state
def get_folders(self) -> Dict:

View File

@@ -45,9 +45,11 @@ class FileSystemFile(FileSystemItem):
:return: Current state of this object and child objects.
:rtype: Dict
"""
return {
"uuid": self.uuid,
"name": self.name,
"size": self.size,
"file_type": self.file_type,
}
state = super().describe_state()
state.update(
{
"uuid": self.uuid,
"file_type": self.file_type.name,
}
)
return state

View File

@@ -10,7 +10,7 @@ _LOGGER = getLogger(__name__)
class FileSystemFolder(FileSystemItem):
"""Simulation FileSystemFolder."""
files: Dict = {}
files: Dict[str, FileSystemFile] = {}
"""List of files stored in the folder."""
is_quarantined: bool = False
@@ -25,13 +25,14 @@ class FileSystemFolder(FileSystemItem):
:return: Current state of this object and child objects.
:rtype: Dict
"""
return {
"uuid": self.uuid,
"name": self.name,
"size": self.size,
"files": {uuid: file for uuid, file in self.files.items()},
"is_quarantined": self.is_quarantined,
}
state = super().describe_state()
state.update(
{
"files": {uuid: file.describe_state() for uuid, file in self.files.items()},
"is_quarantined": self.is_quarantined,
}
)
return state
def get_file_by_id(self, file_id: str) -> FileSystemFile:
"""Return a FileSystemFile with the matching id."""

View File

@@ -11,15 +11,19 @@ from prettytable import PrettyTable
from primaite import getLogger
from primaite.exceptions import NetworkError
from primaite.simulator.core import SimComponent
from primaite.simulator.domain.account import Account
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import ICMPPacket, ICMPType, IPPacket, IPProtocol
from primaite.simulator.network.transmission.transport_layer import Port, TCPHeader
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.packet_capture import PacketCapture
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.processes.process import Process
from primaite.simulator.system.services.service import Service
_LOGGER = getLogger(__name__)
@@ -137,9 +141,9 @@ class NIC(SimComponent):
state = super().describe_state()
state.update(
{
"ip_adress": self.ip_address,
"subnet_mask": self.subnet_mask,
"gateway": self.gateway,
"ip_adress": str(self.ip_address),
"subnet_mask": str(self.subnet_mask),
"gateway": str(self.gateway),
"mac_address": self.mac_address,
"speed": self.speed,
"mtu": self.mtu,
@@ -319,6 +323,7 @@ class SwitchPort(SimComponent):
"enabled": self.enabled,
}
)
return state
def enable(self):
"""Attempt to enable the SwitchPort."""
@@ -802,13 +807,13 @@ class Node(SimComponent):
nics: Dict[str, NIC] = {}
"The NICs on the node."
accounts: Dict = {}
accounts: Dict[str, Account] = {}
"All accounts on the node."
applications: Dict = {}
applications: Dict[str, Application] = {}
"All applications on the node."
services: Dict = {}
services: Dict[str, Service] = {}
"All services on the node."
processes: Dict = {}
processes: Dict[str, Process] = {}
"All processes on the node."
file_system: FileSystem
"The nodes file system."
@@ -862,9 +867,9 @@ class Node(SimComponent):
"NICs": {uuid: nic.describe_state() for uuid, nic in self.nics.items()},
# "switch_ports": {uuid, sp for uuid, sp in self.switch_ports.items()},
"file_system": self.file_system.describe_state(),
"applications": {uuid: app for uuid, app in self.applications.items()},
"services": {uuid: svc for uuid, svc in self.services.items()},
"process": {uuid: proc for uuid, proc in self.processes.items()},
"applications": {uuid: app.describe_state() for uuid, app in self.applications.items()},
"services": {uuid: svc.describe_state() for uuid, svc in self.services.items()},
"process": {uuid: proc.describe_state() for uuid, proc in self.processes.items()},
}
)
return state
@@ -1026,7 +1031,7 @@ class Switch(Node):
return {
"uuid": self.uuid,
"num_ports": self.num_ports, # redundant?
"ports": {port_num: port for port_num, port in self.switch_ports.items()},
"ports": {port_num: port.describe_state() for port_num, port in self.switch_ports.items()},
"mac_address_table": {mac: port for mac, port in self.mac_address_table.items()},
}

View File

@@ -8,13 +8,12 @@ from primaite.simulator.system.software import IOSoftware
class ApplicationOperatingState(Enum):
"""Enumeration of Application Operating States."""
RUNNING = 1
"The application is running."
CLOSED = 2
"The application is closed or not running."
INSTALLING = 3
"The application is being installed or updated."
RUNNING = 1
"The application is running."
CLOSED = 2
"The application is closed or not running."
INSTALLING = 3
"The application is being installed or updated."
class Application(IOSoftware):
@@ -43,7 +42,16 @@ class Application(IOSoftware):
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"opearting_state": self.operating_state.name,
"execution_control_status": self.execution_control_status,
"num_executions": self.num_executions,
"groups": list(self.groups),
}
)
return state
def apply_action(self, action: List[str]) -> None:
"""

View File

@@ -34,4 +34,6 @@ class Process(Software):
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update({"operating_state": self.operating_state.name})
return state

View File

@@ -42,7 +42,9 @@ class Service(IOSoftware):
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update({"operating_state": self.operating_state.name})
return state
def apply_action(self, action: List[str]) -> None:
"""

View File

@@ -151,7 +151,17 @@ class IOSoftware(Software):
:return: Current state of this object and child objects.
:rtype: Dict
"""
pass
state = super().describe_state()
state.update(
{
"installing_count": self.installing_count,
"max_sessions": self.max_sessions,
"tcp": self.tcp,
"udp": self.udp,
"ports": [port.name for port in self.ports], # TODO: not sure if this should be port.name or port.value
}
)
return state
def send(self, payload: Any, session_id: str, **kwargs) -> bool:
"""