2024-06-05 09:11:37 +01:00
|
|
|
# © Crown-owned copyright 2024, Defence Science and Technology Laboratory UK
|
2023-11-29 01:28:40 +00:00
|
|
|
from typing import Tuple
|
|
|
|
|
|
|
|
|
|
import pytest
|
2023-09-20 16:23:35 +01:00
|
|
|
|
2024-08-16 11:51:38 +01:00
|
|
|
from primaite.interface.request import RequestResponse
|
2024-02-05 08:44:10 +00:00
|
|
|
from primaite.simulator.network.hardware.nodes.host.computer import Computer
|
|
|
|
|
from primaite.simulator.network.hardware.nodes.host.server import Server
|
2023-09-20 16:23:35 +01:00
|
|
|
from primaite.simulator.system.services.ftp.ftp_client import FTPClient
|
|
|
|
|
from primaite.simulator.system.services.ftp.ftp_server import FTPServer
|
|
|
|
|
from primaite.simulator.system.services.service import ServiceOperatingState
|
|
|
|
|
|
|
|
|
|
|
2023-11-29 01:28:40 +00:00
|
|
|
@pytest.fixture(scope="function")
|
|
|
|
|
def ftp_client_and_ftp_server(client_server) -> Tuple[FTPClient, Computer, FTPServer, Server]:
|
|
|
|
|
computer, server = client_server
|
|
|
|
|
|
|
|
|
|
# Install FTP Client service on computer
|
|
|
|
|
computer.software_manager.install(FTPClient)
|
2023-11-30 13:49:37 +00:00
|
|
|
ftp_client: FTPClient = computer.software_manager.software.get("FTPClient")
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_client.start()
|
|
|
|
|
|
|
|
|
|
# Install FTP Server service on server
|
|
|
|
|
server.software_manager.install(FTPServer)
|
2023-11-30 13:49:37 +00:00
|
|
|
ftp_server: FTPServer = server.software_manager.software.get("FTPServer")
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_server.start()
|
|
|
|
|
|
|
|
|
|
return ftp_client, computer, ftp_server, server
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_ftp_client_store_file_in_server(ftp_client_and_ftp_server):
|
2023-09-20 16:23:35 +01:00
|
|
|
"""
|
|
|
|
|
Test checks to see if the client is able to store files in the backup server.
|
|
|
|
|
"""
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
|
2023-09-20 16:23:35 +01:00
|
|
|
|
|
|
|
|
assert ftp_client.operating_state == ServiceOperatingState.RUNNING
|
2023-11-29 01:28:40 +00:00
|
|
|
assert ftp_server.operating_state == ServiceOperatingState.RUNNING
|
2023-09-20 16:23:35 +01:00
|
|
|
|
|
|
|
|
# create file on ftp client
|
|
|
|
|
ftp_client.file_system.create_file(file_name="test_file.txt")
|
|
|
|
|
|
2023-09-28 12:23:49 +01:00
|
|
|
assert ftp_client.send_file(
|
2023-09-20 16:23:35 +01:00
|
|
|
src_folder_name="root",
|
|
|
|
|
src_file_name="test_file.txt",
|
|
|
|
|
dest_folder_name="client_1_backup",
|
|
|
|
|
dest_file_name="test_file.txt",
|
2024-02-05 08:44:10 +00:00
|
|
|
dest_ip_address=server.network_interfaces.get(next(iter(server.network_interfaces))).ip_address,
|
2023-09-20 16:23:35 +01:00
|
|
|
)
|
|
|
|
|
|
2023-11-29 01:28:40 +00:00
|
|
|
assert ftp_server.file_system.get_file(folder_name="client_1_backup", file_name="test_file.txt")
|
2023-09-20 16:23:35 +01:00
|
|
|
|
|
|
|
|
|
2023-11-29 01:28:40 +00:00
|
|
|
def test_ftp_client_retrieve_file_from_server(ftp_client_and_ftp_server):
|
2023-09-20 16:23:35 +01:00
|
|
|
"""
|
|
|
|
|
Test checks to see if the client is able to retrieve files from the backup server.
|
|
|
|
|
"""
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
|
2023-09-20 16:23:35 +01:00
|
|
|
|
|
|
|
|
assert ftp_client.operating_state == ServiceOperatingState.RUNNING
|
2023-11-29 01:28:40 +00:00
|
|
|
assert ftp_server.operating_state == ServiceOperatingState.RUNNING
|
2023-09-20 16:23:35 +01:00
|
|
|
|
|
|
|
|
# create file on ftp server
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_server.file_system.create_file(file_name="test_file.txt", folder_name="file_share")
|
2023-09-20 16:23:35 +01:00
|
|
|
|
2023-09-28 12:23:49 +01:00
|
|
|
assert ftp_client.request_file(
|
2023-09-20 16:23:35 +01:00
|
|
|
src_folder_name="file_share",
|
|
|
|
|
src_file_name="test_file.txt",
|
|
|
|
|
dest_folder_name="downloads",
|
|
|
|
|
dest_file_name="test_file.txt",
|
2024-02-05 08:44:10 +00:00
|
|
|
dest_ip_address=server.network_interfaces.get(next(iter(server.network_interfaces))).ip_address,
|
2023-09-20 16:23:35 +01:00
|
|
|
)
|
2023-09-21 15:13:30 +01:00
|
|
|
|
|
|
|
|
# client should have retrieved the file
|
|
|
|
|
assert ftp_client.file_system.get_file(folder_name="downloads", file_name="test_file.txt")
|
2023-11-23 19:49:03 +00:00
|
|
|
|
|
|
|
|
|
2023-11-29 01:28:40 +00:00
|
|
|
def test_ftp_client_tries_to_connect_to_offline_server(ftp_client_and_ftp_server):
|
2023-11-23 19:49:03 +00:00
|
|
|
"""Test checks to make sure that the client can't do anything when the server is offline."""
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
|
2023-11-23 19:49:03 +00:00
|
|
|
|
|
|
|
|
assert ftp_client.operating_state == ServiceOperatingState.RUNNING
|
2023-11-29 01:28:40 +00:00
|
|
|
assert ftp_server.operating_state == ServiceOperatingState.RUNNING
|
2023-11-23 19:49:03 +00:00
|
|
|
|
|
|
|
|
# create file on ftp server
|
2023-11-29 01:28:40 +00:00
|
|
|
ftp_server.file_system.create_file(file_name="test_file.txt", folder_name="file_share")
|
2023-11-23 19:49:03 +00:00
|
|
|
|
2023-11-29 01:28:40 +00:00
|
|
|
server.power_off()
|
2023-11-23 19:49:03 +00:00
|
|
|
|
2023-11-29 01:28:40 +00:00
|
|
|
for i in range(server.shut_down_duration + 1):
|
|
|
|
|
server.apply_timestep(timestep=i)
|
2023-11-23 19:49:03 +00:00
|
|
|
|
|
|
|
|
assert ftp_client.operating_state == ServiceOperatingState.RUNNING
|
2023-11-29 01:28:40 +00:00
|
|
|
assert ftp_server.operating_state == ServiceOperatingState.STOPPED
|
2023-11-23 19:49:03 +00:00
|
|
|
|
|
|
|
|
assert (
|
|
|
|
|
ftp_client.request_file(
|
|
|
|
|
src_folder_name="file_share",
|
|
|
|
|
src_file_name="test_file.txt",
|
|
|
|
|
dest_folder_name="downloads",
|
|
|
|
|
dest_file_name="test_file.txt",
|
2024-02-05 08:44:10 +00:00
|
|
|
dest_ip_address=server.network_interfaces.get(next(iter(server.network_interfaces))).ip_address,
|
2023-11-23 19:49:03 +00:00
|
|
|
)
|
|
|
|
|
is False
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# client should have retrieved the file
|
|
|
|
|
assert ftp_client.file_system.get_file(folder_name="downloads", file_name="test_file.txt") is None
|
2024-08-16 11:51:38 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_ftp_client_store_file_in_server_via_request_success(ftp_client_and_ftp_server):
|
|
|
|
|
"""
|
|
|
|
|
Test checks to see if the client can successfully store files in the backup server via the request manager.
|
|
|
|
|
"""
|
|
|
|
|
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
|
|
|
|
|
|
|
|
|
|
assert ftp_client.operating_state == ServiceOperatingState.RUNNING
|
|
|
|
|
assert ftp_server.operating_state == ServiceOperatingState.RUNNING
|
|
|
|
|
|
|
|
|
|
# create file on ftp client
|
|
|
|
|
ftp_client.file_system.create_file(file_name="test_file.txt")
|
|
|
|
|
|
|
|
|
|
ftp_opts = {
|
|
|
|
|
"src_folder_name": "root",
|
|
|
|
|
"src_file_name": "test_file.txt",
|
|
|
|
|
"dest_folder_name": "client_1_backup",
|
|
|
|
|
"dest_file_name": "test_file.txt",
|
|
|
|
|
"dest_ip_address": server.network_interfaces.get(next(iter(server.network_interfaces))).ip_address,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ftp_client.apply_request(["send", ftp_opts])
|
|
|
|
|
|
|
|
|
|
assert ftp_server.file_system.get_file(folder_name="client_1_backup", file_name="test_file.txt")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_ftp_client_store_file_in_server_via_request_failure(ftp_client_and_ftp_server):
|
|
|
|
|
"""
|
|
|
|
|
Test checks to see if the client fails to store files in the backup server via the request manager.
|
|
|
|
|
"""
|
|
|
|
|
ftp_client, computer, ftp_server, server = ftp_client_and_ftp_server
|
|
|
|
|
|
|
|
|
|
assert ftp_client.operating_state == ServiceOperatingState.RUNNING
|
|
|
|
|
assert ftp_server.operating_state == ServiceOperatingState.RUNNING
|
|
|
|
|
|
|
|
|
|
# create file on ftp client
|
|
|
|
|
ftp_client.file_system.create_file(file_name="test_file.txt")
|
|
|
|
|
|
|
|
|
|
# Purposefully misconfigured FTP Options
|
|
|
|
|
ftp_opts = {
|
|
|
|
|
"src_folder_name": "root",
|
|
|
|
|
"dest_ip_address": server.network_interfaces.get(next(iter(server.network_interfaces))).ip_address,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
request_response: RequestResponse = ftp_client.apply_request(["send", ftp_opts])
|
|
|
|
|
|
|
|
|
|
assert request_response.status == "failure"
|
|
|
|
|
|
|
|
|
|
# Purposefully misconfigured FTP Options
|
|
|
|
|
|
|
|
|
|
ftp_opts = {
|
|
|
|
|
"src_folder_name": "root",
|
|
|
|
|
"src_file_name": "not_a_real_file.txt",
|
|
|
|
|
"dest_folder_name": "client_1_backup",
|
|
|
|
|
"dest_file_name": "test_file.txt",
|
|
|
|
|
"dest_ip_address": server.network_interfaces.get(next(iter(server.network_interfaces))).ip_address,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
request_response: RequestResponse = ftp_client.apply_request(["send", ftp_opts])
|
|
|
|
|
|
|
|
|
|
assert request_response.status == "failure"
|