#1706 - Applied some final changes from PR. Fixed the PCAP log name on SwitchPort so that a pcap file is generated for each port.
This commit is contained in:
@@ -17,6 +17,7 @@ with open(Path(__file__).parent.resolve() / "VERSION", "r") as file:
|
||||
__version__ = file.readline().strip()
|
||||
|
||||
_PRIMAITE_ROOT: Path = Path(__file__).parent
|
||||
# TODO: Remove once we integrate the simulation into PrimAITE and it uses the primaite session path
|
||||
|
||||
|
||||
class _PrimaitePaths:
|
||||
|
||||
@@ -2,3 +2,4 @@ from primaite import _PRIMAITE_ROOT
|
||||
|
||||
TEMP_SIM_OUTPUT = _PRIMAITE_ROOT.parent.parent / "simulation_output"
|
||||
"A path at the repo root dir to use temporarily for sim output testing while in dev."
|
||||
# TODO: Remove once we integrate the simulation into PrimAITE and it uses the primaite session path
|
||||
|
||||
@@ -9,15 +9,15 @@ from pydantic import BaseModel, ConfigDict
|
||||
class SimComponent(BaseModel):
|
||||
"""Extension of pydantic BaseModel with additional methods that must be defined by all classes in the simulator."""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
uuid: str
|
||||
"The component UUID."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if not kwargs.get("uuid"):
|
||||
kwargs["uuid"] = str(uuid4())
|
||||
super().__init__(**kwargs)
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
uuid: str
|
||||
"The component UUID."
|
||||
|
||||
@abstractmethod
|
||||
def describe_state(self) -> Dict:
|
||||
"""
|
||||
|
||||
@@ -308,7 +308,7 @@ class SwitchPort(SimComponent):
|
||||
|
||||
self.enabled = True
|
||||
self.connected_node.sys_log.info(f"SwitchPort {self} enabled")
|
||||
self.pcap = PacketCapture(hostname=self.connected_node.hostname)
|
||||
self.pcap = PacketCapture(hostname=self.connected_node.hostname, switch_port_number=self.port_num)
|
||||
if self.connected_link:
|
||||
self.connected_link.endpoint_up()
|
||||
|
||||
|
||||
@@ -8,12 +8,13 @@ from primaite.simulator.system.software import IOSoftware
|
||||
class ApplicationOperatingState(Enum):
|
||||
"""Enumeration of Application Operating States."""
|
||||
|
||||
CLOSED = 0
|
||||
"The application is closed or not running."
|
||||
RUNNING = 1
|
||||
"The application is running."
|
||||
INSTALLING = 3
|
||||
"The application is being installed or updated."
|
||||
|
||||
RUNNING = 1
|
||||
"The application is running."
|
||||
CLOSED = 2
|
||||
"The application is closed or not running."
|
||||
INSTALLING = 3
|
||||
"The application is being installed or updated."
|
||||
|
||||
|
||||
class Application(IOSoftware):
|
||||
|
||||
@@ -20,7 +20,7 @@ class PacketCapture:
|
||||
The PCAPs are logged to: <simulation output directory>/<hostname>/<hostname>_<ip address>_pcap.log
|
||||
"""
|
||||
|
||||
def __init__(self, hostname: str, ip_address: Optional[str] = None):
|
||||
def __init__(self, hostname: str, ip_address: Optional[str] = None, switch_port_number: Optional[int] = None):
|
||||
"""
|
||||
Initialize the PacketCapture process.
|
||||
|
||||
@@ -31,6 +31,8 @@ class PacketCapture:
|
||||
"The hostname for which PCAP logs are being recorded."
|
||||
self.ip_address: str = ip_address
|
||||
"The IP address associated with the PCAP logs."
|
||||
self.switch_port_number = switch_port_number
|
||||
"The SwitchPort number."
|
||||
self._setup_logger()
|
||||
|
||||
def _setup_logger(self):
|
||||
@@ -43,20 +45,26 @@ class PacketCapture:
|
||||
log_format = "%(message)s"
|
||||
file_handler.setFormatter(logging.Formatter(log_format))
|
||||
|
||||
logger_name = f"{self.hostname}_{self.ip_address}_pcap" if self.ip_address else f"{self.hostname}_pcap"
|
||||
self.logger = logging.getLogger(logger_name)
|
||||
self.logger = logging.getLogger(self._logger_name)
|
||||
self.logger.setLevel(60) # Custom log level > CRITICAL to prevent any unwanted standard DEBUG-CRITICAL logs
|
||||
self.logger.addHandler(file_handler)
|
||||
|
||||
self.logger.addFilter(_JSONFilter())
|
||||
|
||||
@property
|
||||
def _logger_name(self) -> str:
|
||||
"""Get PCAP the logger name."""
|
||||
if self.ip_address:
|
||||
return f"{self.hostname}_{self.ip_address}_pcap"
|
||||
if self.switch_port_number:
|
||||
return f"{self.hostname}_port-{self.switch_port_number}_pcap"
|
||||
return f"{self.hostname}_pcap"
|
||||
|
||||
def _get_log_path(self) -> Path:
|
||||
"""Get the path for the log file."""
|
||||
root = TEMP_SIM_OUTPUT / self.hostname
|
||||
root.mkdir(exist_ok=True, parents=True)
|
||||
if self.ip_address:
|
||||
return root / f"{self.hostname}_{self.ip_address}_pcap.log"
|
||||
return root / f"{self.hostname}_pcap.log"
|
||||
return root / f"{self._logger_name}.log"
|
||||
|
||||
def capture(self, frame): # noqa - I'll have a circular import and cant use if TYPE_CHECKING ;(
|
||||
"""
|
||||
|
||||
@@ -8,17 +8,17 @@ from primaite.simulator.system.software import IOSoftware
|
||||
class ServiceOperatingState(Enum):
|
||||
"""Enumeration of Service Operating States."""
|
||||
|
||||
STOPPED = 0
|
||||
"The service is not running."
|
||||
RUNNING = 1
|
||||
"The service is currently running."
|
||||
RESTARTING = 2
|
||||
"The service is in the process of restarting."
|
||||
STOPPED = 2
|
||||
"The service is not running."
|
||||
INSTALLING = 3
|
||||
"The service is being installed or updated."
|
||||
PAUSED = 4
|
||||
RESTARTING = 4
|
||||
"The service is in the process of restarting."
|
||||
PAUSED = 5
|
||||
"The service is temporarily paused."
|
||||
DISABLED = 5
|
||||
DISABLED = 6
|
||||
"The service is disabled and cannot be started."
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user