Change port and protocol to annotated validators

This commit is contained in:
Marek Wolan
2024-09-25 16:28:22 +01:00
parent 695891f55c
commit f1b911bc65
84 changed files with 380 additions and 392 deletions

View File

@@ -12,7 +12,8 @@ from primaite.game.agent.observations.nic_observations import NICObservation
from primaite.game.agent.observations.observations import AbstractObservation, WhereType
from primaite.game.agent.observations.software_observation import ApplicationObservation, ServiceObservation
from primaite.game.agent.utils import access_from_nested_dict, NOT_PRESENT_IN_STATE
from primaite.utils.validators import IPProtocol, Port
from primaite.utils.validation.ip_protocol import IPProtocol
from primaite.utils.validation.port import Port
_LOGGER = getLogger(__name__)

View File

@@ -1,16 +1,15 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from __future__ import annotations
from typing import Dict, Optional
from typing import Dict, List, Optional
from gymnasium import spaces
from gymnasium.core import ObsType
from pydantic import field_validator
from primaite.game.agent.observations.observations import AbstractObservation, WhereType
from primaite.game.agent.utils import access_from_nested_dict, NOT_PRESENT_IN_STATE
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol
from primaite.utils.validation.port import Port
class NICObservation(AbstractObservation, identifier="NETWORK_INTERFACE"):
@@ -23,30 +22,9 @@ class NICObservation(AbstractObservation, identifier="NETWORK_INTERFACE"):
"""Number of the network interface."""
include_nmne: Optional[bool] = None
"""Whether to include number of malicious network events (NMNE) in the observation."""
monitored_traffic: Optional[Dict] = None
monitored_traffic: Optional[Dict[IPProtocol, List[Port]]] = None
"""A dict containing which traffic types are to be included in the observation."""
@field_validator("monitored_traffic", mode="before")
def traffic_lookup(cls, val: Optional[Dict]) -> Optional[Dict]:
"""
Convert monitored_traffic by lookup against Port and Protocol dicts.
This is necessary for retaining compatiblility with configs written for PrimAITE <=3.3.
This method will be removed in PrimAITE >= 4.0
"""
if val is None:
return val
new_val = {}
for proto, port_list in val.items():
# convert protocol, for instance ICMP becomes "icmp"
proto = PROTOCOL_LOOKUP[proto] if proto in PROTOCOL_LOOKUP else proto
new_val[proto] = []
for port in port_list:
# convert ports, for instance "HTTP" becomes 80
port = PORT_LOOKUP[port] if port in PORT_LOOKUP else port
new_val[proto].append(port)
return new_val
def __init__(self, where: WhereType, include_nmne: bool, monitored_traffic: Optional[Dict] = None) -> None:
"""
Initialise a network interface observation instance.

View File

@@ -5,15 +5,15 @@ from typing import Dict, List, Optional
from gymnasium import spaces
from gymnasium.core import ObsType
from pydantic import field_validator, model_validator
from pydantic import model_validator
from primaite import getLogger
from primaite.game.agent.observations.firewall_observation import FirewallObservation
from primaite.game.agent.observations.host_observations import HostObservation
from primaite.game.agent.observations.observations import AbstractObservation, WhereType
from primaite.game.agent.observations.router_observation import RouterObservation
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol
from primaite.utils.validation.port import Port
_LOGGER = getLogger(__name__)
@@ -42,7 +42,7 @@ class NodesObservation(AbstractObservation, identifier="NODES"):
"""Number of network interface cards (NICs)."""
include_nmne: Optional[bool] = None
"""Flag to include nmne."""
monitored_traffic: Optional[Dict] = None
monitored_traffic: Optional[Dict[IPProtocol, List[Port]]] = None
"""A dict containing which traffic types are to be included in the observation."""
include_num_access: Optional[bool] = None
"""Flag to include the number of accesses."""
@@ -63,27 +63,6 @@ class NodesObservation(AbstractObservation, identifier="NODES"):
num_rules: Optional[int] = None
"""Number of rules ACL rules to show."""
@field_validator("monitored_traffic", mode="before")
def traffic_lookup(cls, val: Optional[Dict]) -> Optional[Dict]:
"""
Convert monitored_traffic by lookup against Port and Protocol dicts.
This is necessary for retaining compatiblility with configs written for PrimAITE <=3.3.
This method will be removed in PrimAITE >= 4.0
"""
if val is None:
return val
new_val = {}
for proto, port_list in val.items():
# convert protocol, for instance ICMP becomes "icmp"
proto = PROTOCOL_LOOKUP[proto] if proto in PROTOCOL_LOOKUP else proto
new_val[proto] = []
for port in port_list:
# convert ports, for instance "HTTP" becomes 80
port = PORT_LOOKUP[port] if port in PORT_LOOKUP else port
new_val[proto].append(port)
return new_val
@model_validator(mode="after")
def force_optional_fields(self) -> NodesObservation.ConfigSchema:
"""Check that options are specified only if they are needed for the nodes that are part of the config."""

View File

@@ -4,7 +4,7 @@ from ipaddress import IPv4Address
from typing import Dict, List, Optional, Union
import numpy as np
from pydantic import BaseModel, ConfigDict, field_validator
from pydantic import BaseModel, ConfigDict
from primaite import DEFAULT_BANDWIDTH, getLogger
from primaite.game.agent.actions import ActionManager
@@ -27,7 +27,6 @@ from primaite.simulator.network.hardware.nodes.network.router import Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.nmne import NMNEConfig
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.sim_container import Simulation
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient # noqa: F401
@@ -50,7 +49,8 @@ from primaite.simulator.system.services.service import Service
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.software import Software
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -81,39 +81,13 @@ class PrimaiteGameOptions(BaseModel):
"""Random number seed for RNGs."""
max_episode_length: int = 256
"""Maximum number of episodes for the PrimAITE game."""
ports: List[int]
ports: List[Port]
"""A whitelist of available ports in the simulation."""
protocols: List[str]
protocols: List[IPProtocol]
"""A whitelist of available protocols in the simulation."""
thresholds: Optional[Dict] = {}
"""A dict containing the thresholds used for determining what is acceptable during observations."""
@field_validator("ports", mode="before")
def ports_str2int(cls, vals: Union[List[str], List[int]]) -> List[int]:
"""
Convert named port strings to port integer values. Integer ports remain unaffected.
This is necessary to retain backwards compatibility with configs written for PrimAITE<=3.3.
:warning: This will be deprecated in PrimAITE 4.0 and configs will need to be converted.
"""
for i, port_val in enumerate(vals):
if port_val in PORT_LOOKUP:
vals[i] = PORT_LOOKUP[port_val]
return vals
@field_validator("protocols", mode="before")
def protocols_str2int(cls, vals: List[str]) -> List[str]:
"""
Convert old-style named protocols to their proper values.
This is necessary to retain backwards compatibility with configs written for PrimAITE<=3.3.
:warning: This will be deprecated in PrimAITE 4.0 and configs will need to be converted.
"""
for i, proto_val in enumerate(vals):
if proto_val in PROTOCOL_LOOKUP:
vals[i] = PROTOCOL_LOOKUP[proto_val]
return vals
class PrimaiteGame:
"""

View File

@@ -6,8 +6,8 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
def num_of_switches_required(num_nodes: int, max_network_interface: int = 24) -> int:

View File

@@ -21,7 +21,6 @@ from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.nmne import NMNEConfig
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.packet_capture import PacketCapture
from primaite.simulator.system.core.session_manager import SessionManager
@@ -32,7 +31,9 @@ from primaite.simulator.system.services.service import Service
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.simulator.system.software import IOSoftware, Software
from primaite.utils.converters import convert_dict_enum_keys_to_enum_values
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import PORT_LOOKUP
IOSoftwareClass = TypeVar("IOSoftwareClass", bound=IOSoftware)

View File

@@ -22,7 +22,7 @@ from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.icmp.icmp import ICMP
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.utils.validators import IPV4Address
from primaite.utils.validation.ipv4_address import IPV4Address
_LOGGER = getLogger(__name__)

View File

@@ -14,9 +14,10 @@ from primaite.simulator.network.hardware.nodes.network.router import (
RouterInterface,
)
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.sys_log import SysLog
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import PORT_LOOKUP
EXTERNAL_PORT_ID: Final[int] = 1
"""The Firewall port ID of the external port."""

View File

@@ -7,7 +7,7 @@ from ipaddress import IPv4Address, IPv4Network
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union
from prettytable import MARKDOWN, PrettyTable
from pydantic import field_validator, validate_call
from pydantic import validate_call
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
@@ -17,14 +17,15 @@ from primaite.simulator.network.hardware.nodes.network.network_node import Netwo
from primaite.simulator.network.protocols.arp import ARPPacket
from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.nmap import NMAP
from primaite.simulator.system.core.session_manager import SessionManager
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.services.arp.arp import ARP
from primaite.simulator.system.services.icmp.icmp import ICMP
from primaite.simulator.system.services.terminal.terminal import Terminal
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import Port, PORT_LOOKUP
@validate_call()
@@ -120,29 +121,15 @@ class ACLRule(SimComponent):
"""
action: ACLAction = ACLAction.DENY
protocol: Optional[str] = None
protocol: Optional[IPProtocol] = None
src_ip_address: Optional[IPV4Address] = None
src_wildcard_mask: Optional[IPV4Address] = None
dst_ip_address: Optional[IPV4Address] = None
dst_wildcard_mask: Optional[IPV4Address] = None
src_port: Optional[int] = None
dst_port: Optional[int] = None
src_port: Optional[Port] = None
dst_port: Optional[Port] = None
match_count: int = 0
@field_validator("protocol", mode="before")
def protocol_valid(cls, val: Optional[str]) -> Optional[str]:
"""Assert that the protocol for the rule is predefined in the IPProtocol lookup."""
if val is not None:
assert val in PROTOCOL_LOOKUP.values(), f"Cannot create ACL rule with invalid protocol {val}"
return val
@field_validator("src_port", "dst_port", mode="before")
def ports_valid(cls, val: Optional[int]) -> Optional[int]:
"""Assert that the port for the rule is predefined in the Port lookup."""
if val is not None:
assert val in PORT_LOOKUP.values(), f"Cannot create ACL rule with invalid port {val}"
return val
def __str__(self) -> str:
rule_strings = []
for key, value in self.model_dump(exclude={"uuid", "request_manager"}).items():
@@ -390,13 +377,13 @@ class AccessControlList(SimComponent):
def add_rule(
self,
action: ACLAction = ACLAction.DENY,
protocol: Optional[str] = None,
protocol: Optional[IPProtocol] = None,
src_ip_address: Optional[IPV4Address] = None,
src_wildcard_mask: Optional[IPV4Address] = None,
dst_ip_address: Optional[IPV4Address] = None,
dst_wildcard_mask: Optional[IPV4Address] = None,
src_port: Optional[int] = None,
dst_port: Optional[int] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
position: int = 0,
) -> bool:
"""
@@ -498,11 +485,11 @@ class AccessControlList(SimComponent):
def get_relevant_rules(
self,
protocol: str,
protocol: IPProtocol,
src_ip_address: Union[str, IPv4Address],
src_port: int,
src_port: Port,
dst_ip_address: Union[str, IPv4Address],
dst_port: int,
dst_port: Port,
) -> List[ACLRule]:
"""
Get the list of relevant rules for a packet with given properties.
@@ -1101,17 +1088,17 @@ class RouterSessionManager(SessionManager):
def resolve_outbound_transmission_details(
self,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[int] = None,
dst_port: Optional[int] = None,
protocol: Optional[str] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
protocol: Optional[IPProtocol] = None,
session_id: Optional[str] = None,
) -> Tuple[
Optional[RouterInterface],
Optional[str],
IPv4Address,
Optional[int],
Optional[int],
Optional[str],
Optional[Port],
Optional[Port],
Optional[IPProtocol],
bool,
]:
"""
@@ -1131,19 +1118,19 @@ class RouterSessionManager(SessionManager):
treats the transmission as a broadcast to that network. Optional.
:type dst_ip_address: Optional[Union[IPv4Address, IPv4Network]]
:param src_port: The source port number for the transmission. Optional.
:type src_port: Optional[int]
:type src_port: Optional[Port]
:param dst_port: The destination port number for the transmission. Optional.
:type dst_port: Optional[int]
:type dst_port: Optional[Port]
:param protocol: The IP protocol to be used for the transmission. Optional.
:type protocol: Optional[str]
:type protocol: Optional[IPProtocol]
:param session_id: The session ID associated with the transmission. If provided, the session details override
other parameters. Optional.
:type session_id: Optional[str]
:return: A tuple containing the resolved outbound network interface, destination MAC address, destination IP
address, source port, destination port, protocol, and a boolean indicating whether the transmission is a
broadcast.
:rtype: Tuple[Optional[RouterInterface], Optional[str], IPv4Address, Optional[int], Optional[int],
Optional[str], bool]
:rtype: Tuple[Optional[RouterInterface], Optional[str], IPv4Address, Optional[Port], Optional[Port],
Optional[IPProtocol], bool]
"""
if dst_ip_address and not isinstance(dst_ip_address, (IPv4Address, IPv4Network)):
dst_ip_address = IPv4Address(dst_ip_address)

View File

@@ -8,8 +8,9 @@ from primaite.simulator.network.airspace import AirSpace, IPWirelessNetworkInter
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router, RouterInterface
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import PORT_LOOKUP
class WirelessAccessPoint(IPWirelessNetworkInterface):

View File

@@ -12,14 +12,14 @@ from primaite.simulator.network.hardware.nodes.host.host_node import NIC
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import DataManipulationBot
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -3,14 +3,16 @@ from enum import Enum
from typing import Optional
from primaite.simulator.network.protocols.packet import DataPacket
from primaite.utils.validation.ip_protocol import IPProtocol
from primaite.utils.validation.port import Port
class MasqueradePacket(DataPacket):
"""Represents an generic malicious packet that is masquerading as another protocol."""
masquerade_protocol: str # The 'Masquerade' protocol that is currently in use
masquerade_protocol: IPProtocol # The 'Masquerade' protocol that is currently in use
masquerade_port: int # The 'Masquerade' port that is currently in use
masquerade_port: Port # The 'Masquerade' port that is currently in use
class C2Packet(MasqueradePacket):

View File

@@ -9,9 +9,10 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket
from primaite.simulator.network.protocols.packet import DataPacket
from primaite.simulator.network.transmission.network_layer import IPPacket
from primaite.simulator.network.transmission.primaite_layer import PrimaiteHeader
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP, TCPHeader, UDPHeader
from primaite.simulator.network.transmission.transport_layer import TCPHeader, UDPHeader
from primaite.simulator.network.utils import convert_bytes_to_megabits
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -4,7 +4,8 @@ from enum import Enum
from pydantic import BaseModel
from primaite import getLogger
from primaite.utils.validators import IPProtocol, IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
_LOGGER = getLogger(__name__)

View File

@@ -4,38 +4,6 @@ from typing import List
from pydantic import BaseModel
PORT_LOOKUP: dict[str, int] = dict(
UNUSED=-1,
NONE=0,
WOL=9,
FTP_DATA=20,
FTP=21,
SSH=22,
SMTP=25,
DNS=53,
HTTP=80,
POP3=110,
SFTP=115,
NTP=123,
IMAP=143,
SNMP=161,
SNMP_TRAP=162,
ARP=219,
LDAP=389,
HTTPS=443,
SMB=445,
IPP=631,
SQL_SERVER=1433,
MYSQL=3306,
RDP=3389,
RTP=5004,
RTP_ALT=5005,
DNS_ALT=5353,
HTTP_ALT=8080,
HTTPS_ALT=8443,
POSTGRES_SERVER=5432,
)
class UDPHeader(BaseModel):
"""

View File

@@ -11,10 +11,11 @@ from pydantic import BaseModel
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import PORT_LOOKUP
class DatabaseClientConnection(BaseModel):

View File

@@ -7,9 +7,10 @@ from pydantic import validate_call
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType, SimComponent
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, is_valid_protocol, PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import is_valid_port, Port, PORT_LOOKUP
class PortScanPayload(SimComponent):
@@ -23,8 +24,8 @@ class PortScanPayload(SimComponent):
"""
ip_address: IPV4Address
port: int
protocol: str
port: Port
protocol: IPProtocol
request: bool = True
def describe_state(self) -> Dict:
@@ -217,14 +218,14 @@ class NMAP(Application, identifier="NMAP"):
print(table.get_string(sortby="IP Address"))
return active_nodes
def _determine_port_scan_type(self, target_ip_addresses: List[IPV4Address], target_ports: List[int]) -> str:
def _determine_port_scan_type(self, target_ip_addresses: List[IPV4Address], target_ports: List[Port]) -> str:
"""
Determine the type of port scan based on the number of target IP addresses and ports.
:param target_ip_addresses: The list of target IP addresses.
:type target_ip_addresses: List[IPV4Address]
:param target_ports: The list of target ports.
:type target_ports: List[int]
:type target_ports: List[Port]
:return: The type of port scan.
:rtype: str
@@ -237,8 +238,8 @@ class NMAP(Application, identifier="NMAP"):
def _check_port_open_on_ip_address(
self,
ip_address: IPv4Address,
port: int,
protocol: str,
port: Port,
protocol: IPProtocol,
is_re_attempt: bool = False,
port_scan_uuid: Optional[str] = None,
) -> bool:
@@ -250,7 +251,7 @@ class NMAP(Application, identifier="NMAP"):
:param port: The target port.
:type port: Port
:param protocol: The protocol used for the port scan.
:type protocol: str
:type protocol: IPProtocol
:param is_re_attempt: Flag indicating if this is a reattempt. Defaults to False.
:type is_re_attempt: bool
:param port_scan_uuid: The UUID of the port scan payload. Defaults to None.
@@ -319,20 +320,20 @@ class NMAP(Application, identifier="NMAP"):
def port_scan(
self,
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
target_protocol: Optional[Union[str, List[str]]] = None,
target_port: Optional[Union[int, List[int]]] = None,
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
target_port: Optional[Union[Port, List[Port]]] = None,
show: bool = True,
json_serializable: bool = False,
) -> Dict[IPv4Address, Dict[str, List[int]]]:
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
"""
Perform a port scan on the target IP address(es).
:param target_ip_address: The target IP address(es) or network(s) for the port scan.
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
:param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP.
:type target_protocol: Optional[Union[str, List[str]]]
:type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]]
:param target_port: The port(s) to scan. Defaults to None, which includes all valid ports.
:type target_port: Optional[Union[int, List[int]]]
:type target_port: Optional[Union[Port, List[Port]]]
:param show: Flag indicating whether to display the scan results. Defaults to True.
:type show: bool
:param json_serializable: Flag indicating whether the return value should be JSON serializable. Defaults to
@@ -340,16 +341,16 @@ class NMAP(Application, identifier="NMAP"):
:type json_serializable: bool
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
:rtype: Dict[IPv4Address, Dict[str, List[int]]]
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
"""
ip_addresses = self._explode_ip_address_network_array(target_ip_address)
if isinstance(target_port, int):
if is_valid_port(target_port):
target_port = [target_port]
elif target_port is None:
target_port = [port for port in PORT_LOOKUP if port not in {PORT_LOOKUP["NONE"], PORT_LOOKUP["UNUSED"]}]
if isinstance(target_protocol, str):
if is_valid_protocol(target_protocol):
target_protocol = [target_protocol]
elif target_protocol is None:
target_protocol = [PROTOCOL_LOOKUP["TCP"], PROTOCOL_LOOKUP["UDP"]]
@@ -389,12 +390,12 @@ class NMAP(Application, identifier="NMAP"):
def network_service_recon(
self,
target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]],
target_protocol: Optional[Union[str, List[str]]] = None,
target_port: Optional[Union[int, List[int]]] = None,
target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]] = None,
target_port: Optional[Union[Port, List[Port]]] = None,
show: bool = True,
show_online_only: bool = True,
json_serializable: bool = False,
) -> Dict[IPv4Address, Dict[str, List[int]]]:
) -> Dict[IPv4Address, Dict[IPProtocol, List[Port]]]:
"""
Perform a network service reconnaissance which includes a ping scan followed by a port scan.
@@ -407,9 +408,9 @@ class NMAP(Application, identifier="NMAP"):
:param target_ip_address: The target IP address(es) or network(s) for the port scan.
:type target_ip_address: Union[IPV4Address, List[IPV4Address], IPv4Network, List[IPv4Network]]
:param target_protocol: The protocol(s) to use for the port scan. Defaults to None, which includes TCP and UDP.
:type target_protocol: Optional[Union[str, List[str]]]
:type target_protocol: Optional[Union[IPProtocol, List[IPProtocol]]]
:param target_port: The port(s) to scan. Defaults to None, which includes all valid ports.
:type target_port: Optional[Union[int, List[int]]]
:type target_port: Optional[Union[Port, List[Port]]]
:param show: Flag indicating whether to display the scan results. Defaults to True.
:type show: bool
:param show_online_only: Flag indicating whether to show only the online hosts. Defaults to True.
@@ -419,7 +420,7 @@ class NMAP(Application, identifier="NMAP"):
:type json_serializable: bool
:return: A dictionary mapping IP addresses to protocols and lists of open ports.
:rtype: Dict[IPv4Address, Dict[str, List[int]]]
:rtype: Dict[IPv4Address, Dict[IPProtocol, List[Port]]]
"""
ping_scan_results = self.ping_scan(
target_ip_address=target_ip_address, show=show, show_online_only=show_online_only, json_serializable=False

View File

@@ -9,14 +9,14 @@ from pydantic import BaseModel, Field, validate_call
from primaite.interface.request import RequestResponse
from primaite.simulator.file_system.file_system import FileSystem, Folder
from primaite.simulator.network.protocols.masquerade import C2Packet
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application, ApplicationOperatingState
from primaite.simulator.system.core.session_manager import Session
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, is_valid_protocol, PROTOCOL_LOOKUP
from primaite.utils.validation.port import is_valid_port, Port, PORT_LOOKUP
class C2Command(Enum):
@@ -81,10 +81,10 @@ class AbstractC2(Application, identifier="AbstractC2"):
keep_alive_frequency: int = Field(default=5, ge=1)
"""The frequency at which ``Keep Alive`` packets are sent to the C2 Server from the C2 Beacon."""
masquerade_protocol: str = Field(default=PROTOCOL_LOOKUP["TCP"])
masquerade_protocol: IPProtocol = Field(default=PROTOCOL_LOOKUP["TCP"])
"""The currently chosen protocol that the C2 traffic is masquerading as. Defaults as TCP."""
masquerade_port: int = Field(default=PORT_LOOKUP["HTTP"])
masquerade_port: Port = Field(default=PORT_LOOKUP["HTTP"])
"""The currently chosen port that the C2 traffic is masquerading as. Defaults at HTTP."""
c2_config: _C2Opts = _C2Opts()
@@ -367,7 +367,7 @@ class AbstractC2(Application, identifier="AbstractC2"):
:rtype: bool
"""
# Validating that they are valid Enums.
if not isinstance(payload.masquerade_port, int) or not isinstance(payload.masquerade_protocol, str):
if not is_valid_port(payload.masquerade_port) or not is_valid_protocol(payload.masquerade_protocol):
self.sys_log.warning(
f"{self.name}: Received invalid Masquerade Values within Keep Alive."
f"Port: {payload.masquerade_port} Protocol: {payload.masquerade_protocol}."

View File

@@ -8,12 +8,12 @@ from pydantic import validate_call
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.protocols.masquerade import C2Packet
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.red_applications.c2 import ExfilOpts, RansomwareOpts, TerminalOpts
from primaite.simulator.system.applications.red_applications.c2.abstract_c2 import AbstractC2, C2Command, C2Payload
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.terminal.terminal import Terminal, TerminalClientConnection
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
class C2Beacon(AbstractC2, identifier="C2Beacon"):

View File

@@ -7,10 +7,10 @@ from primaite import getLogger
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -7,8 +7,8 @@ from primaite import getLogger
from primaite.game.science import simulate_trial
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -35,7 +35,7 @@ class DoSBot(DatabaseClient, identifier="DoSBot"):
target_ip_address: Optional[IPv4Address] = None
"""IP address of the target service."""
target_port: Optional[int] = None
target_port: Optional[Port] = None
"""Port of the target service."""
payload: Optional[str] = None

View File

@@ -6,10 +6,10 @@ from prettytable import MARKDOWN, PrettyTable
from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
class RansomwareScript(Application, identifier="RansomwareScript"):

View File

@@ -15,10 +15,10 @@ from primaite.simulator.network.protocols.http import (
HttpResponsePacket,
HttpStatusCode,
)
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -154,7 +154,7 @@ class WebBrowser(Application, identifier="WebBrowser"):
self,
payload: HttpRequestPacket,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[int] = PORT_LOOKUP["HTTP"],
dest_port: Optional[Port] = PORT_LOOKUP["HTTP"],
session_id: Optional[str] = None,
**kwargs,
) -> bool:

View File

@@ -11,8 +11,9 @@ from primaite.simulator.network.protocols.arp import ARPPacket
from primaite.simulator.network.protocols.icmp import ICMPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP, TCPHeader, UDPHeader
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.simulator.network.transmission.transport_layer import TCPHeader, UDPHeader
from primaite.utils.validation.ip_protocol import IPProtocol, PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
if TYPE_CHECKING:
from primaite.simulator.network.hardware.base import NetworkInterface
@@ -37,12 +38,12 @@ class Session(SimComponent):
protocol: str
with_ip_address: IPv4Address
src_port: Optional[int]
dst_port: Optional[int]
src_port: Optional[Port]
dst_port: Optional[Port]
connected: bool = False
@classmethod
def from_session_key(cls, session_key: Tuple[str, IPv4Address, Optional[int], Optional[int]]) -> Session:
def from_session_key(cls, session_key: Tuple[IPProtocol, IPv4Address, Optional[Port], Optional[Port]]) -> Session:
"""
Create a Session instance from a session key tuple.
@@ -77,7 +78,9 @@ class SessionManager:
"""
def __init__(self, sys_log: SysLog):
self.sessions_by_key: Dict[Tuple[str, IPv4Address, IPv4Address, Optional[int], Optional[int]], Session] = {}
self.sessions_by_key: Dict[
Tuple[IPProtocol, IPv4Address, IPv4Address, Optional[Port], Optional[Port]], Session
] = {}
self.sessions_by_uuid: Dict[str, Session] = {}
self.sys_log: SysLog = sys_log
self.software_manager: SoftwareManager = None # Noqa
@@ -102,7 +105,7 @@ class SessionManager:
@staticmethod
def _get_session_key(
frame: Frame, inbound_frame: bool = True
) -> Tuple[str, IPv4Address, Optional[int], Optional[int]]:
) -> Tuple[IPProtocol, IPv4Address, Optional[Port], Optional[Port]]:
"""
Extracts the session key from the given frame.
@@ -110,8 +113,8 @@ class SessionManager:
- IPProtocol: The transport protocol (e.g. TCP, UDP, ICMP).
- IPv4Address: The source IP address.
- IPv4Address: The destination IP address.
- Optional[int]: The source port number (if applicable).
- Optional[int]: The destination port number (if applicable).
- Optional[Port]: The source port number (if applicable).
- Optional[Port]: The destination port number (if applicable).
:param frame: The network frame from which to extract the session key.
:return: A tuple containing the session key.
@@ -166,17 +169,17 @@ class SessionManager:
def resolve_outbound_transmission_details(
self,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[int] = None,
dst_port: Optional[int] = None,
protocol: Optional[str] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
protocol: Optional[IPProtocol] = None,
session_id: Optional[str] = None,
) -> Tuple[
Optional["NetworkInterface"],
Optional[str],
IPv4Address,
Optional[int],
Optional[int],
Optional[str],
Optional[Port],
Optional[Port],
Optional[IPProtocol],
bool,
]:
"""
@@ -195,19 +198,19 @@ class SessionManager:
treats the transmission as a broadcast to that network. Optional.
:type dst_ip_address: Optional[Union[IPv4Address, IPv4Network]]
:param src_port: The source port number for the transmission. Optional.
:type src_port: Optional[int]
:type src_port: Optional[Port]
:param dst_port: The destination port number for the transmission. Optional.
:type dst_port: Optional[int]
:type dst_port: Optional[Port]
:param protocol: The IP protocol to be used for the transmission. Optional.
:type protocol: Optional[str]
:type protocol: Optional[IPProtocol]
:param session_id: The session ID associated with the transmission. If provided, the session details override
other parameters. Optional.
:type session_id: Optional[str]
:return: A tuple containing the resolved outbound network interface, destination MAC address, destination IP
address, source port, destination port, protocol, and a boolean indicating whether the transmission is a
broadcast.
:rtype: Tuple[Optional["NetworkInterface"], Optional[str], IPv4Address, Optional[int], Optional[int],
Optional[str], bool]
:rtype: Tuple[Optional["NetworkInterface"], Optional[str], IPv4Address, Optional[Port], Optional[Port],
Optional[IPProtocol], bool]
"""
if dst_ip_address and not isinstance(dst_ip_address, (IPv4Address, IPv4Network)):
dst_ip_address = IPv4Address(dst_ip_address)
@@ -258,10 +261,10 @@ class SessionManager:
self,
payload: Any,
dst_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[int] = None,
dst_port: Optional[int] = None,
src_port: Optional[Port] = None,
dst_port: Optional[Port] = None,
session_id: Optional[str] = None,
ip_protocol: str = PROTOCOL_LOOKUP["TCP"],
ip_protocol: IPProtocol = PROTOCOL_LOOKUP["TCP"],
icmp_packet: Optional[ICMPPacket] = None,
) -> Union[Any, None]:
"""

View File

@@ -8,12 +8,12 @@ from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.core import RequestType
from primaite.simulator.file_system.file_system import FileSystem
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application, ApplicationOperatingState
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.simulator.system.software import IOSoftware
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
if TYPE_CHECKING:
from primaite.simulator.system.core.session_manager import SessionManager
@@ -52,7 +52,7 @@ class SoftwareManager:
self.session_manager = session_manager
self.software: Dict[str, Union[Service, Application]] = {}
self._software_class_to_name_map: Dict[Type[IOSoftware], str] = {}
self.port_protocol_mapping: Dict[Tuple[int, str], Union[Service, Application]] = {}
self.port_protocol_mapping: Dict[Tuple[Port, IPProtocol], Union[Service, Application]] = {}
self.sys_log: SysLog = sys_log
self.file_system: FileSystem = file_system
self.dns_server: Optional[IPv4Address] = dns_server
@@ -67,7 +67,7 @@ class SoftwareManager:
"""Provides access to the ICMP service instance, if installed."""
return self.software.get("ICMP") # noqa
def get_open_ports(self) -> List[int]:
def get_open_ports(self) -> List[Port]:
"""
Get a list of open ports.
@@ -81,7 +81,7 @@ class SoftwareManager:
open_ports += list(software.listen_on_ports)
return open_ports
def check_port_is_open(self, port: int, protocol: str) -> bool:
def check_port_is_open(self, port: Port, protocol: IPProtocol) -> bool:
"""
Check if a specific port is open and running a service using the specified protocol.
@@ -93,7 +93,7 @@ class SoftwareManager:
:param port: The port to check.
:type port: Port
:param protocol: The protocol to check (e.g., TCP, UDP).
:type protocol: str
:type protocol: IPProtocol
:return: True if the port is open and a service is running on it using the specified protocol, False otherwise.
:rtype: bool
"""
@@ -189,9 +189,9 @@ class SoftwareManager:
self,
payload: Any,
dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
src_port: Optional[int] = None,
dest_port: Optional[int] = None,
ip_protocol: str = PROTOCOL_LOOKUP["TCP"],
src_port: Optional[Port] = None,
dest_port: Optional[Port] = None,
ip_protocol: IPProtocol = PROTOCOL_LOOKUP["TCP"],
session_id: Optional[str] = None,
) -> bool:
"""
@@ -219,8 +219,8 @@ class SoftwareManager:
def receive_payload_from_session_manager(
self,
payload: Any,
port: int,
protocol: str,
port: Port,
protocol: IPProtocol,
session_id: str,
from_network_interface: "NIC",
frame: Frame,

View File

@@ -8,9 +8,10 @@ from prettytable import MARKDOWN, PrettyTable
from primaite.simulator.network.hardware.base import NetworkInterface
from primaite.simulator.network.protocols.arp import ARPEntry, ARPPacket
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import IPV4Address, PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.ipv4_address import IPV4Address
from primaite.utils.validation.port import PORT_LOOKUP
class ARP(Service):

View File

@@ -7,12 +7,12 @@ from primaite import getLogger
from primaite.simulator.file_system.file_system import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -4,10 +4,10 @@ from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.dns import DNSPacket, DNSRequest
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -110,7 +110,7 @@ class DNSClient(Service):
payload: DNSPacket,
session_id: Optional[str] = None,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[int] = None,
dest_port: Optional[Port] = None,
**kwargs,
) -> bool:
"""

View File

@@ -6,9 +6,9 @@ from prettytable import MARKDOWN, PrettyTable
from primaite import getLogger
from primaite.simulator.network.protocols.dns import DNSPacket
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -7,10 +7,10 @@ from primaite.interface.request import RequestFormat, RequestResponse
from primaite.simulator.core import RequestManager, RequestType
from primaite.simulator.file_system.file_system import File
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.ftp.ftp_service import FTPServiceABC
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -104,7 +104,7 @@ class FTPClient(FTPServiceABC):
def _connect_to_server(
self,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[int] = PORT_LOOKUP["FTP"],
dest_port: Optional[Port] = PORT_LOOKUP["FTP"],
session_id: Optional[str] = None,
is_reattempt: Optional[bool] = False,
) -> bool:
@@ -114,7 +114,7 @@ class FTPClient(FTPServiceABC):
:param: dest_ip_address: IP address of the FTP server the client needs to connect to. Optional.
:type: dest_ip_address: Optional[IPv4Address]
:param: dest_port: Port of the FTP server the client needs to connect to. Optional.
:type: dest_port: Optional[int]
:type: dest_port: Optional[Port]
:param: is_reattempt: Set to True if attempt to connect to FTP Server has been attempted. Default False.
:type: is_reattempt: Optional[bool]
"""
@@ -152,7 +152,7 @@ class FTPClient(FTPServiceABC):
return False
def _disconnect_from_server(
self, dest_ip_address: Optional[IPv4Address] = None, dest_port: Optional[int] = PORT_LOOKUP["FTP"]
self, dest_ip_address: Optional[IPv4Address] = None, dest_port: Optional[Port] = PORT_LOOKUP["FTP"]
) -> bool:
"""
Connects the client from a given FTP server.
@@ -160,7 +160,7 @@ class FTPClient(FTPServiceABC):
:param: dest_ip_address: IP address of the FTP server the client needs to disconnect from. Optional.
:type: dest_ip_address: Optional[IPv4Address]
:param: dest_port: Port of the FTP server the client needs to disconnect from. Optional.
:type: dest_port: Optional[int]
:type: dest_port: Optional[Port]
:param: is_reattempt: Set to True if attempt to disconnect from FTP Server has been attempted. Default False.
:type: is_reattempt: Optional[bool]
"""
@@ -179,7 +179,7 @@ class FTPClient(FTPServiceABC):
src_file_name: str,
dest_folder_name: str,
dest_file_name: str,
dest_port: Optional[int] = PORT_LOOKUP["FTP"],
dest_port: Optional[Port] = PORT_LOOKUP["FTP"],
session_id: Optional[str] = None,
) -> bool:
"""
@@ -204,7 +204,7 @@ class FTPClient(FTPServiceABC):
:type: dest_file_name: str
:param: dest_port: The open port of the machine that hosts the FTP Server. Default is Port["FTP"].
:type: dest_port: Optional[int]
:type: dest_port: Optional[Port]
:param: session_id: The id of the session
:type: session_id: Optional[str]
@@ -241,7 +241,7 @@ class FTPClient(FTPServiceABC):
src_file_name: str,
dest_folder_name: str,
dest_file_name: str,
dest_port: Optional[int] = PORT_LOOKUP["FTP"],
dest_port: Optional[Port] = PORT_LOOKUP["FTP"],
) -> bool:
"""
Request a file from a target IP address.

View File

@@ -3,9 +3,9 @@ from typing import Any, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.ftp.ftp_service import FTPServiceABC
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import is_valid_port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -52,7 +52,7 @@ class FTPServer(FTPServiceABC):
# process server specific commands, otherwise call super
if payload.ftp_command == FTPCommand.PORT:
# check that the port is valid
if isinstance(payload.ftp_command_args, int) and (0 <= payload.ftp_command_args < 65535):
if is_valid_port(payload.ftp_command_args):
# return successful connection
self.add_connection(connection_id=session_id, session_id=session_id)
payload.status_code = FTPStatusCode.OK

View File

@@ -6,6 +6,7 @@ from typing import Dict, Optional
from primaite.simulator.file_system.file_system import File
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
from primaite.simulator.system.services.service import Service
from primaite.utils.validation.port import Port
class FTPServiceABC(Service, ABC):
@@ -77,7 +78,7 @@ class FTPServiceABC(Service, ABC):
dest_folder_name: str,
dest_file_name: str,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[int] = None,
dest_port: Optional[Port] = None,
session_id: Optional[str] = None,
is_response: bool = False,
) -> bool:
@@ -97,7 +98,7 @@ class FTPServiceABC(Service, ABC):
:type: dest_ip_address: Optional[IPv4Address]
:param: dest_port: The open port of the machine that hosts the FTP Server. Default is Port["FTP"].
:type: dest_port: Optional[int]
:type: dest_port: Optional[Port]
:param: session_id: session ID linked to the FTP Packet. Optional.
:type: session_id: Optional[str]
@@ -167,7 +168,7 @@ class FTPServiceABC(Service, ABC):
payload: FTPPacket,
session_id: Optional[str] = None,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[int] = None,
dest_port: Optional[Port] = None,
**kwargs,
) -> bool:
"""

View File

@@ -7,9 +7,9 @@ from primaite import getLogger
from primaite.simulator.network.hardware.base import NetworkInterface
from primaite.simulator.network.protocols.icmp import ICMPPacket, ICMPType
from primaite.simulator.network.transmission.data_link_layer import Frame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -5,9 +5,9 @@ from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.ntp import NTPPacket
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -55,7 +55,7 @@ class NTPClient(Service):
payload: NTPPacket,
session_id: Optional[str] = None,
dest_ip_address: IPv4Address = None,
dest_port: int = PORT_LOOKUP["NTP"],
dest_port: Port = PORT_LOOKUP["NTP"],
**kwargs,
) -> bool:
"""Requests NTP data from NTP server.

View File

@@ -4,9 +4,9 @@ from typing import Dict, Optional
from primaite import getLogger
from primaite.simulator.network.protocols.ntp import NTPPacket
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -17,10 +17,10 @@ from primaite.simulator.network.protocols.ssh import (
SSHTransportMessage,
SSHUserCredentials,
)
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
# TODO 2824: Since remote terminal connections and remote user sessions are the same thing, we could refactor

View File

@@ -10,11 +10,11 @@ from primaite.simulator.network.protocols.http import (
HttpResponsePacket,
HttpStatusCode,
)
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClientConnection
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port, PORT_LOOKUP
_LOGGER = getLogger(__name__)
@@ -145,7 +145,7 @@ class WebServer(Service):
payload: HttpResponsePacket,
session_id: Optional[str] = None,
dest_ip_address: Optional[IPv4Address] = None,
dest_port: Optional[int] = None,
dest_port: Optional[Port] = None,
**kwargs,
) -> bool:
"""

View File

@@ -15,7 +15,8 @@ from primaite.simulator.file_system.file_system import FileSystem, Folder
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.system.core.session_manager import Session
from primaite.simulator.system.core.sys_log import SysLog
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import IPProtocol, PROTOCOL_LOOKUP
from primaite.utils.validation.port import Port
if TYPE_CHECKING:
from primaite.simulator.system.core.software_manager import SoftwareManager
@@ -250,11 +251,11 @@ class IOSoftware(Software):
"Indicates if the software uses TCP protocol for communication. Default is True."
udp: bool = True
"Indicates if the software uses UDP protocol for communication. Default is True."
port: int
port: Port
"The port to which the software is connected."
listen_on_ports: Set[int] = Field(default_factory=set)
listen_on_ports: Set[Port] = Field(default_factory=set)
"The set of ports to listen on."
protocol: str
protocol: IPProtocol
"The IP Protocol the Software operates on."
_connections: Dict[str, Dict] = {}
"Active connections."
@@ -386,7 +387,7 @@ class IOSoftware(Software):
session_id: Optional[str] = None,
dest_ip_address: Optional[Union[IPv4Address, IPv4Network]] = None,
dest_port: Optional[int] = None,
ip_protocol: str = PROTOCOL_LOOKUP["TCP"],
ip_protocol: IPProtocol = PROTOCOL_LOOKUP["TCP"],
**kwargs,
) -> bool:
"""

View File

@@ -0,0 +1 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK

View File

@@ -0,0 +1,47 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
# Define a custom IP protocol validator
from typing import Any
from pydantic import BeforeValidator, TypeAdapter, ValidationError
from typing_extensions import Annotated, Final
PROTOCOL_LOOKUP: dict[str, str] = dict(
NONE="none",
TCP="tcp",
UDP="udp",
ICMP="icmp",
)
"""
Lookup table used for compatibility with PrimAITE <= 3.3. Configs with the capitalised protocol names are converted
to lowercase at runtime.
"""
VALID_PROTOCOLS = ["none", "tcp", "udp", "icmp"]
"""Supported protocols."""
def protocol_validator(v: Any) -> str:
"""
Validate that IP Protocols are chosen from the list of supported IP Protocols.
The protocol list is dynamic because plugins are able to extend it, therefore it is necessary to use this custom
validator instead of being able to specify a union of string literals.
"""
if isinstance(v, str) and v in PROTOCOL_LOOKUP:
return PROTOCOL_LOOKUP[v]
if v in VALID_PROTOCOLS:
return v
raise ValueError(f"{v} is not a valid IP Protocol. It must be one of the following: {VALID_PROTOCOLS}")
IPProtocol: Final[Annotated] = Annotated[str, BeforeValidator(protocol_validator)]
"""Validates that IP Protocols used in the simulation belong to the list of supported protocols."""
_IPProtocolTypeAdapter = TypeAdapter(IPProtocol)
def is_valid_protocol(v: Any) -> bool:
"""Convenience method to return true if the value matches the schema, and false otherwise."""
try:
_IPProtocolTypeAdapter.validate_python(v)
return True
except ValidationError:
return False

View File

@@ -1,4 +1,6 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from ipaddress import IPv4Address
from typing import Any, Final
@@ -37,39 +39,3 @@ will automatically check and convert the input value to an instance of IPv4Addre
any Pydantic model uses it. This ensures that any field marked with this type is not just
an IPv4Address in form, but also valid according to the rules defined in ipv4_validator.
"""
# Define a custom port validator
Port: Final[Annotated] = Annotated[int, BeforeValidator(lambda n: 0 <= n <= 65535)]
"""Validates that network ports lie in the appropriate range of [0,65535]."""
# Define a custom IP protocol validator
PROTOCOL_LOOKUP: dict[str, str] = dict(
NONE="none",
TCP="tcp",
UDP="udp",
ICMP="icmp",
)
"""
Lookup table used for compatibility with PrimAITE <= 3.3. Configs with the capitalised protocol names are converted
to lowercase at runtime.
"""
VALID_PROTOCOLS = ["none", "tcp", "udp", "icmp"]
"""Supported protocols."""
def protocol_validator(v: Any) -> str:
"""
Validate that IP Protocols are chosen from the list of supported IP Protocols.
The protocol list is dynamic because plugins are able to extend it, therefore it is necessary to use this custom
validator instead of being able to specify a union of string literals.
"""
if v in PROTOCOL_LOOKUP:
return PROTOCOL_LOOKUP(v)
if v in VALID_PROTOCOLS:
return v
raise ValueError(f"{v} is not a valid IP Protocol. It must be one of the following: {VALID_PROTOCOLS}")
IPProtocol: Final[Annotated] = Annotated[str, BeforeValidator(protocol_validator)]
"""Validates that IP Protocols used in the simulation belong to the list of supported protocols."""

View File

@@ -0,0 +1,70 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
# Define a custom port validator
from typing import Any
from pydantic import BeforeValidator, TypeAdapter, ValidationError
from typing_extensions import Annotated, Final
PORT_LOOKUP: dict[str, int] = dict(
UNUSED=-1,
NONE=0,
WOL=9,
FTP_DATA=20,
FTP=21,
SSH=22,
SMTP=25,
DNS=53,
HTTP=80,
POP3=110,
SFTP=115,
NTP=123,
IMAP=143,
SNMP=161,
SNMP_TRAP=162,
ARP=219,
LDAP=389,
HTTPS=443,
SMB=445,
IPP=631,
SQL_SERVER=1433,
MYSQL=3306,
RDP=3389,
RTP=5004,
RTP_ALT=5005,
DNS_ALT=5353,
HTTP_ALT=8080,
HTTPS_ALT=8443,
POSTGRES_SERVER=5432,
)
"""
Lookup table used for compatibility with PrimAITE <= 3.3. Configs with named ports names are converted
to port integers at runtime.
"""
def port_validator(v: Any) -> int:
"""
Validate that Ports are chosen from the list of supported Ports.
The protocol list is dynamic because plugins are able to extend it, therefore it is necessary to use this custom
validator instead of being able to specify a union of string literals.
"""
if isinstance(v, str) and v in PORT_LOOKUP:
v = PORT_LOOKUP[v]
if isinstance(v, int) and (0 <= v <= 65535):
return v
raise ValueError(f"{v} is not a valid Port. It must be an integer in the range [0,65535] or ")
Port: Final[Annotated] = Annotated[int, BeforeValidator(port_validator)]
"""Validates that network ports lie in the appropriate range of [0,65535]."""
_PortTypeAdapter = TypeAdapter(Port)
def is_valid_port(v: Any) -> bool:
"""Convenience method to return true if the value matches the schema, and false otherwise."""
try:
_PortTypeAdapter.validate_python(v)
return True
except ValidationError:
return False

View File

@@ -18,7 +18,6 @@ from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.networks import arcd_uc2_network
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.sim_container import Simulation
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.web_browser import WebBrowser
@@ -27,7 +26,8 @@ from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT
rayinit()

View File

@@ -9,8 +9,8 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests.integration_tests.configuration_file_parsing import BASIC_FIREWALL, DMZ_NETWORK, load_config

View File

@@ -6,8 +6,8 @@ from primaite.simulator.network.hardware.node_operating_state import NodeOperati
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests.integration_tests.configuration_file_parsing import DMZ_NETWORK, load_config

View File

@@ -15,11 +15,11 @@ from primaite.simulator.network.protocols.http import (
HttpResponsePacket,
HttpStatusCode,
)
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -3,7 +3,7 @@ from typing import ClassVar, Dict
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode, NIC
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.utils.validators import IPV4Address
from primaite.utils.validation.ipv4_address import IPV4Address
class SuperComputer(HostNode, identifier="supercomputer"):

View File

@@ -7,12 +7,12 @@ from primaite import getLogger
from primaite.simulator.file_system.file_system import File
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.file_system.folder import Folder
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.software_manager import SoftwareManager
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.service import Service, ServiceOperatingState
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
_LOGGER = getLogger(__name__)

View File

@@ -11,13 +11,13 @@ from primaite.simulator.network.hardware.base import UserManager
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture

View File

@@ -11,12 +11,12 @@ from primaite.game.agent.actions import (
)
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.dos_bot import DoSBot
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT
from tests.conftest import ControlledAgent

View File

@@ -9,9 +9,9 @@ from primaite.simulator.network.hardware.base import UserManager
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.terminal.terminal import RemoteTerminalConnection
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture

View File

@@ -4,10 +4,10 @@ import pytest
from primaite.game.agent.observations.acl_observation import ACLObservation
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.sim_container import Simulation
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -5,8 +5,8 @@ from primaite.simulator.network.hardware.node_operating_state import NodeOperati
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
def check_default_rules(acl_obs):

View File

@@ -8,9 +8,9 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.sim_container import Simulation
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
def test_router_observation():

View File

@@ -3,7 +3,7 @@ import pytest
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT
DATA_MANIPULATION_CONFIG = TEST_ASSETS_ROOT / "configs" / "data_manipulation.yaml"

View File

@@ -21,11 +21,11 @@ from primaite.game.agent.interface import ProxyAgent
from primaite.game.game import PrimaiteGame
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.file_system.file_system_item_abc import FileSystemItemHealthStatus
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT
FIREWALL_ACTIONS_NETWORK = TEST_ASSETS_ROOT / "configs/firewall_actions_network.yaml"

View File

@@ -9,11 +9,11 @@ from primaite.interface.request import RequestResponse
from primaite.session.environment import PrimaiteGymEnv
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT
from tests.conftest import ControlledAgent

View File

@@ -8,10 +8,10 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import Application
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
class BroadcastTestService(Service):

View File

@@ -7,10 +7,10 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.firewall import Firewall
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -6,10 +6,10 @@ import pytest
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.ntp.ntp_client import NTPClient
from primaite.simulator.system.services.ntp.ntp_server import NTPServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -7,8 +7,8 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.network.router import ACLAction
from primaite.simulator.network.hardware.nodes.network.wireless_router import WirelessRouter
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT

View File

@@ -13,7 +13,6 @@ from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import AccessControlList, ACLAction, Router
from primaite.simulator.network.hardware.nodes.network.switch import Switch
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
@@ -24,7 +23,8 @@ from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT

View File

@@ -9,7 +9,6 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
@@ -19,6 +18,7 @@ from primaite.simulator.system.applications.red_applications.data_manipulation_b
from primaite.simulator.system.applications.red_applications.dos_bot import DoSAttackStage, DoSBot
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -8,12 +8,12 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.red_applications.dos_bot import DoSAttackStage, DoSBot
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -9,11 +9,11 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient, DatabaseClientConnection
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -5,9 +5,9 @@ from ipaddress import IPv4Address, IPv4Network
import yaml
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.nmap import NMAP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT

View File

@@ -6,11 +6,11 @@ from pydantic import Field
from primaite.game.game import PrimaiteGame
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.services.database.database_service import DatabaseService
from primaite.simulator.system.services.service import Service
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests import TEST_ASSETS_ROOT

View File

@@ -9,7 +9,6 @@ from primaite.simulator.network.hardware.base import Link
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.database_client import DatabaseClient
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.simulator.system.services.database.database_service import DatabaseService
@@ -17,6 +16,7 @@ from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.simulator.system.software import SoftwareHealthState
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -12,7 +12,7 @@ from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.host_node import HostNode
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
from tests.conftest import DummyApplication, DummyService

View File

@@ -8,8 +8,9 @@ from primaite.simulator.network.hardware.nodes.network.router import ACLAction,
from primaite.simulator.network.protocols.icmp import ICMPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP, TCPHeader, UDPHeader
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.simulator.network.transmission.transport_layer import TCPHeader, UDPHeader
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -2,8 +2,8 @@
from ipaddress import IPv4Address
from primaite.simulator.network.hardware.nodes.network.router import ACLAction, Router
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
def test_wireless_router_from_config():

View File

@@ -5,8 +5,9 @@ from primaite.simulator.network.protocols.icmp import ICMPPacket
from primaite.simulator.network.transmission.data_link_layer import EthernetHeader, Frame
from primaite.simulator.network.transmission.network_layer import IPPacket, Precedence
from primaite.simulator.network.transmission.primaite_layer import AgentSource, DataStatus
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP, TCPFlags, TCPHeader, UDPHeader
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.simulator.network.transmission.transport_layer import TCPFlags, TCPHeader, UDPHeader
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
def test_frame_minimal_instantiation():

View File

@@ -4,11 +4,11 @@ import pytest
from primaite.simulator.network.container import Network
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.red_applications.c2.c2_beacon import C2Beacon
from primaite.simulator.system.applications.red_applications.c2.c2_server import C2Command, C2Server
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -3,13 +3,13 @@ import pytest
from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.networks import arcd_uc2_network
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.red_applications.data_manipulation_bot import (
DataManipulationAttackStage,
DataManipulationBot,
)
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -5,9 +5,9 @@ import pytest
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.red_applications.dos_bot import DoSAttackStage, DoSBot
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -4,10 +4,10 @@ import pytest
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.protocols.http import HttpResponsePacket, HttpStatusCode
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.application import ApplicationOperatingState
from primaite.simulator.system.applications.web_browser import WebBrowser
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -6,10 +6,10 @@ import pytest
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.protocols.dns import DNSPacket, DNSReply, DNSRequest
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -8,10 +8,10 @@ from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.dns.dns_client import DNSClient
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -8,10 +8,10 @@ from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.computer import Computer
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -6,10 +6,10 @@ from primaite.simulator.network.hardware.base import Node
from primaite.simulator.network.hardware.node_operating_state import NodeOperatingState
from primaite.simulator.network.hardware.nodes.host.server import Server
from primaite.simulator.network.protocols.ftp import FTPCommand, FTPPacket, FTPStatusCode
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -18,13 +18,13 @@ from primaite.simulator.network.protocols.ssh import (
SSHTransportMessage,
SSHUserCredentials,
)
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.applications.red_applications.ransomware_script import RansomwareScript
from primaite.simulator.system.services.dns.dns_server import DNSServer
from primaite.simulator.system.services.service import ServiceOperatingState
from primaite.simulator.system.services.terminal.terminal import RemoteTerminalConnection, Terminal
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -9,9 +9,9 @@ from primaite.simulator.network.protocols.http import (
HttpResponsePacket,
HttpStatusCode,
)
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.services.web_server.web_server import WebServer
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
@pytest.fixture(scope="function")

View File

@@ -3,11 +3,11 @@ from typing import Dict
import pytest
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.simulator.system.core.sys_log import SysLog
from primaite.simulator.system.services.service import Service
from primaite.simulator.system.software import IOSoftware, SoftwareHealthState
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
class TestSoftware(Service):

View File

@@ -1,7 +1,7 @@
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
from primaite.simulator.network.transmission.transport_layer import PORT_LOOKUP
from primaite.utils.converters import convert_dict_enum_keys_to_enum_values
from primaite.utils.validators import PROTOCOL_LOOKUP
from primaite.utils.validation.ip_protocol import PROTOCOL_LOOKUP
from primaite.utils.validation.port import PORT_LOOKUP
def test_simple_conversion():