Delete set_original_state method definitions

This commit is contained in:
Marek Wolan
2024-02-20 16:29:27 +00:00
parent 64b9ba3ecf
commit f82506023b
29 changed files with 1 additions and 313 deletions

View File

@@ -386,6 +386,4 @@ class PrimaiteGame:
else:
_LOGGER.warning(f"agent type {agent_type} not found")
game.simulation.set_original_state()
return game

View File

@@ -153,8 +153,6 @@ class SimComponent(BaseModel):
uuid: str
"""The component UUID."""
_original_state: Dict = {}
def __init__(self, **kwargs):
if not kwargs.get("uuid"):
kwargs["uuid"] = str(uuid4())
@@ -162,15 +160,9 @@ class SimComponent(BaseModel):
self._request_manager: RequestManager = self._init_request_manager()
self._parent: Optional["SimComponent"] = None
# @abstractmethod
def set_original_state(self):
"""Sets the original state."""
pass
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
for key, value in self._original_state.items():
self.__setattr__(key, value)
pass
def _init_request_manager(self) -> RequestManager:
"""

View File

@@ -42,19 +42,6 @@ class Account(SimComponent):
"Account Type, currently this can be service account (used by apps) or user account."
enabled: bool = True
def set_original_state(self):
"""Sets the original state."""
vals_to_include = {
"num_logons",
"num_logoffs",
"num_group_changes",
"username",
"password",
"account_type",
"enabled",
}
self._original_state = self.model_dump(include=vals_to_include)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.

View File

@@ -73,15 +73,6 @@ class File(FileSystemItemABC):
self.sys_log.info(f"Created file /{self.path} (id: {self.uuid})")
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting File ({self.path}) original state on node {self.sys_log.hostname}")
super().set_original_state()
vals_to_include = {"folder_id", "folder_name", "file_type", "sim_size", "real", "sim_path", "sim_root"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting File ({self.path}) state on node {self.sys_log.hostname}")

View File

@@ -34,17 +34,6 @@ class FileSystem(SimComponent):
if not self.folders:
self.create_folder("root")
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting FileSystem original state on node {self.sys_log.hostname}")
for folder in self.folders.values():
folder.set_original_state()
# Capture a list of all 'original' file uuids
original_keys = list(self.folders.keys())
vals_to_include = {"sim_root"}
self._original_state.update(self.model_dump(include=vals_to_include))
self._original_state["original_folder_uuids"] = original_keys
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting FileSystem state on node {self.sys_log.hostname}")

View File

@@ -85,11 +85,6 @@ class FileSystemItemABC(SimComponent):
deleted: bool = False
"If true, the FileSystemItem was deleted."
def set_original_state(self):
"""Sets the original state."""
vals_to_keep = {"name", "health_status", "visible_health_status", "previous_hash", "revealed_to_red", "deleted"}
self._original_state = self.model_dump(include=vals_to_keep)
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.

View File

@@ -49,23 +49,6 @@ class Folder(FileSystemItemABC):
self.sys_log.info(f"Created file /{self.name} (id: {self.uuid})")
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting Folder ({self.name}) original state on node {self.sys_log.hostname}")
for file in self.files.values():
file.set_original_state()
super().set_original_state()
vals_to_include = {
"scan_duration",
"scan_countdown",
"red_scan_duration",
"red_scan_countdown",
"restore_duration",
"restore_countdown",
}
self._original_state.update(self.model_dump(include=vals_to_include))
self._original_state["original_file_uuids"] = list(self.files.keys())
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting Folder ({self.name}) state on node {self.sys_log.hostname}")

View File

@@ -45,13 +45,6 @@ class Network(SimComponent):
self._nx_graph = MultiGraph()
def set_original_state(self):
"""Sets the original state."""
for node in self.nodes.values():
node.set_original_state()
for link in self.links.values():
link.set_original_state()
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
for node in self.nodes.values():

View File

@@ -123,13 +123,6 @@ class NIC(SimComponent):
_LOGGER.error(msg)
raise ValueError(msg)
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
vals_to_include = {"ip_address", "subnet_mask", "mac_address", "speed", "mtu", "wake_on_lan", "enabled"}
self._original_state = self.model_dump(include=vals_to_include)
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
super().reset_component_for_episode(episode)
@@ -349,14 +342,6 @@ class SwitchPort(SimComponent):
kwargs["mac_address"] = generate_mac_address()
super().__init__(**kwargs)
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
vals_to_include = {"port_num", "mac_address", "speed", "mtu", "enabled"}
self._original_state = self.model_dump(include=vals_to_include)
super().set_original_state()
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
@@ -506,14 +491,6 @@ class Link(SimComponent):
self.endpoint_b.connect_link(self)
self.endpoint_up()
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
vals_to_include = {"bandwidth", "current_load"}
self._original_state = self.model_dump(include=vals_to_include)
super().set_original_state()
def describe_state(self) -> Dict:
"""
Produce a dictionary describing the current state of this object.
@@ -1033,33 +1010,6 @@ class Node(SimComponent):
self.arp.nics = self.nics
self.session_manager.software_manager = self.software_manager
self._install_system_software()
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
for software in self.software_manager.software.values():
software.set_original_state()
self.file_system.set_original_state()
for nic in self.nics.values():
nic.set_original_state()
vals_to_include = {
"hostname",
"default_gateway",
"operating_state",
"revealed_to_red",
"start_up_duration",
"start_up_countdown",
"shut_down_duration",
"shut_down_countdown",
"is_resetting",
"node_scan_duration",
"node_scan_countdown",
"red_scan_countdown",
}
self._original_state = self.model_dump(include=vals_to_include)
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""

View File

@@ -53,11 +53,6 @@ class ACLRule(SimComponent):
rule_strings.append(f"{key}={value}")
return ", ".join(rule_strings)
def set_original_state(self):
"""Sets the original state."""
vals_to_keep = {"action", "protocol", "src_ip_address", "src_port", "dst_ip_address", "dst_port"}
self._original_state = self.model_dump(include=vals_to_keep, exclude_none=True)
def describe_state(self) -> Dict:
"""
Describes the current state of the ACLRule.
@@ -101,28 +96,6 @@ class AccessControlList(SimComponent):
super().__init__(**kwargs)
self._acl = [None] * (self.max_acl_rules - 1)
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
self.implicit_rule.set_original_state()
vals_to_keep = {"implicit_action", "max_acl_rules", "acl"}
self._original_state = self.model_dump(include=vals_to_keep, exclude_none=True)
for i, rule in enumerate(self._acl):
if not rule:
continue
self._default_config[i] = {"action": rule.action.name}
if rule.src_ip_address:
self._default_config[i]["src_ip"] = str(rule.src_ip_address)
if rule.dst_ip_address:
self._default_config[i]["dst_ip"] = str(rule.dst_ip_address)
if rule.src_port:
self._default_config[i]["src_port"] = rule.src_port.name
if rule.dst_port:
self._default_config[i]["dst_port"] = rule.dst_port.name
if rule.protocol:
self._default_config[i]["protocol"] = rule.protocol.name
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
@@ -389,11 +362,6 @@ class RouteEntry(SimComponent):
metric: float = 0.0
"The cost metric for this route. Default is 0.0."
def set_original_state(self):
"""Sets the original state."""
vals_to_include = {"address", "subnet_mask", "next_hop_ip_address", "metric"}
self._original_values = self.model_dump(include=vals_to_include)
def describe_state(self) -> Dict:
"""
Describes the current state of the RouteEntry.
@@ -426,11 +394,6 @@ class RouteTable(SimComponent):
default_route: Optional[RouteEntry] = None
sys_log: SysLog
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()
self._original_state["routes_orig"] = self.routes
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
self.routes.clear()
@@ -808,16 +771,6 @@ class Router(Node):
self.arp.nics = self.nics
self.icmp.arp = self.arp
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
self.acl.set_original_state()
self.route_table.set_original_state()
super().set_original_state()
vals_to_include = {"num_ports"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
self.arp.clear()
@@ -987,7 +940,6 @@ class Router(Node):
nic.ip_address = ip_address
nic.subnet_mask = subnet_mask
self.sys_log.info(f"Configured port {port} with ip_address={ip_address}/{nic.ip_network.prefixlen}")
self.set_original_state()
def enable_port(self, port: int):
"""

View File

@@ -21,10 +21,6 @@ class Simulation(SimComponent):
super().__init__(**kwargs)
def set_original_state(self):
"""Sets the original state."""
self.network.set_original_state()
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
self.network.reset_component_for_episode(episode)

View File

@@ -38,12 +38,6 @@ class Application(IOSoftware):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()
vals_to_include = {"operating_state", "execution_control_status", "num_executions", "groups"}
self._original_state.update(self.model_dump(include=vals_to_include))
@abstractmethod
def describe_state(self) -> Dict:
"""

View File

@@ -30,14 +30,6 @@ class DatabaseClient(Application):
kwargs["port"] = Port.POSTGRES_SERVER
kwargs["protocol"] = IPProtocol.TCP
super().__init__(**kwargs)
self.set_original_state()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting DatabaseClient WebServer original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"server_ip_address", "server_password", "connected", "_query_success_tracker"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""

View File

@@ -49,21 +49,6 @@ class DataManipulationBot(DatabaseClient):
super().__init__(**kwargs)
self.name = "DataManipulationBot"
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting DataManipulationBot original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {
"server_ip_address",
"payload",
"server_password",
"port_scan_p_of_success",
"data_manipulation_p_of_success",
"attack_stage",
"repeat",
}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting DataManipulationBot state on node {self.software_manager.node.hostname}")

View File

@@ -57,22 +57,6 @@ class DoSBot(DatabaseClient, Application):
self.name = "DoSBot"
self.max_sessions = 1000 # override normal max sessions
def set_original_state(self):
"""Set the original state of the Denial of Service Bot."""
_LOGGER.debug(f"Setting {self.name} original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {
"target_ip_address",
"target_port",
"payload",
"repeat",
"attack_stage",
"max_sessions",
"port_scan_p_of_success",
"dos_intensity",
}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting {self.name} state on node {self.software_manager.node.hostname}")

View File

@@ -47,16 +47,8 @@ class WebBrowser(Application):
kwargs["port"] = Port.HTTP
super().__init__(**kwargs)
self.set_original_state()
self.run()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting WebBrowser original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"target_url", "domain_name_ip_address", "latest_response"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting WebBrowser state on node {self.software_manager.node.hostname}")

View File

@@ -24,12 +24,6 @@ class Process(Software):
operating_state: ProcessOperatingState
"The current operating state of the Process."
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()
vals_to_include = {"operating_state"}
self._original_state.update(self.model_dump(include=vals_to_include))
@abstractmethod
def describe_state(self) -> Dict:
"""

View File

@@ -40,19 +40,6 @@ class DatabaseService(Service):
super().__init__(**kwargs)
self._create_db_file()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting DatabaseService original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {
"password",
"connections",
"backup_server_ip",
"latest_backup_directory",
"latest_backup_file_name",
}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug("Resetting DatabaseService original state on node {self.software_manager.node.hostname}")

View File

@@ -29,13 +29,6 @@ class DNSClient(Service):
super().__init__(**kwargs)
self.start()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting DNSClient original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"dns_server"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
self.dns_cache.clear()

View File

@@ -28,13 +28,6 @@ class DNSServer(Service):
super().__init__(**kwargs)
self.start()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting DNSServer original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"dns_table"}
self._original_state["dns_table_orig"] = self.model_dump(include=vals_to_include)["dns_table"]
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
self.dns_table.clear()

View File

@@ -27,13 +27,6 @@ class FTPClient(FTPServiceABC):
super().__init__(**kwargs)
self.start()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting FTPClient original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"connected"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting FTPClient state on node {self.software_manager.node.hostname}")

View File

@@ -27,13 +27,6 @@ class FTPServer(FTPServiceABC):
super().__init__(**kwargs)
self.start()
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting FTPServer original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"server_password"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting FTPServer state on node {self.software_manager.node.hostname}")

View File

@@ -78,12 +78,6 @@ class Service(IOSoftware):
"""
return super().receive(payload=payload, session_id=session_id, **kwargs)
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()
vals_to_include = {"operating_state", "restart_duration", "restart_countdown"}
self._original_state.update(self.model_dump(include=vals_to_include))
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request("scan", RequestType(func=lambda request, context: self.scan()))

View File

@@ -23,13 +23,6 @@ class WebServer(Service):
last_response_status_code: Optional[HttpStatusCode] = None
def set_original_state(self):
"""Sets the original state."""
_LOGGER.debug(f"Setting WebServer original state on node {self.software_manager.node.hostname}")
super().set_original_state()
vals_to_include = {"last_response_status_code"}
self._original_state.update(self.model_dump(include=vals_to_include))
def reset_component_for_episode(self, episode: int):
"""Reset the original state of the SimComponent."""
_LOGGER.debug(f"Resetting WebServer state on node {self.software_manager.node.hostname}")

View File

@@ -96,19 +96,6 @@ class Software(SimComponent):
_patching_countdown: Optional[int] = None
"Current number of ticks left to patch the software."
def set_original_state(self):
"""Sets the original state."""
vals_to_include = {
"name",
"health_state_actual",
"health_state_visible",
"criticality",
"patching_count",
"scanning_count",
"revealed_to_red",
}
self._original_state = self.model_dump(include=vals_to_include)
def _init_request_manager(self) -> RequestManager:
rm = super()._init_request_manager()
rm.add_request(
@@ -245,12 +232,6 @@ class IOSoftware(Software):
_connections: Dict[str, Dict] = {}
"Active connections."
def set_original_state(self):
"""Sets the original state."""
super().set_original_state()
vals_to_include = {"installing_count", "max_sessions", "tcp", "udp", "port"}
self._original_state.update(self.model_dump(include=vals_to_include))
@abstractmethod
def describe_state(self) -> Dict:
"""

View File

@@ -7,7 +7,6 @@ from primaite.simulator.domain.account import Account, AccountType
@pytest.fixture(scope="function")
def account() -> Account:
acct = Account(username="Jake", password="totally_hashed_password", account_type=AccountType.USER)
acct.set_original_state()
return acct
@@ -39,7 +38,6 @@ def test_original_state(account):
account.log_on()
account.log_off()
account.disable()
account.set_original_state()
account.log_on()
state = account.describe_state()

View File

@@ -189,7 +189,6 @@ def test_reset_file_system(file_system):
# file and folder that existed originally
file_system.create_file(file_name="test_file.zip")
file_system.create_folder(folder_name="test_folder")
file_system.set_original_state()
# create a new file
file_system.create_file(file_name="new_file.txt")

View File

@@ -33,7 +33,6 @@ def network(example_network) -> Network:
assert len(example_network.computers) is 2
assert len(example_network.servers) is 2
example_network.set_original_state()
example_network.show()
return example_network

View File

@@ -22,7 +22,6 @@ def dos_bot() -> DoSBot:
dos_bot: DoSBot = computer.software_manager.software.get("DoSBot")
dos_bot.configure(target_ip_address=IPv4Address("192.168.0.1"))
dos_bot.set_original_state()
return dos_bot
@@ -51,7 +50,6 @@ def test_dos_bot_reset(dos_bot):
dos_bot.configure(
target_ip_address=IPv4Address("192.168.1.1"), target_port=Port.HTTP, payload="payload", repeat=True
)
dos_bot.set_original_state()
dos_bot.reset_component_for_episode(episode=1)
# should reset to the configured value
assert dos_bot.target_ip_address == IPv4Address("192.168.1.1")