From c9e4ba3c7d95c7a8b4208f13e56bb93b7ba9b0c9 Mon Sep 17 00:00:00 2001 From: "Czar.Echavez" Date: Thu, 12 Oct 2023 11:16:25 +0100 Subject: [PATCH] #1947: File and Folder hash checks --- .../simulator/file_system/file_system.py | 75 ++++++++++++++++++- .../_file_system/test_file_system.py | 54 +++++++++++++ 2 files changed, 128 insertions(+), 1 deletion(-) diff --git a/src/primaite/simulator/file_system/file_system.py b/src/primaite/simulator/file_system/file_system.py index 5653234d..5a089c9b 100644 --- a/src/primaite/simulator/file_system/file_system.py +++ b/src/primaite/simulator/file_system/file_system.py @@ -1,5 +1,7 @@ from __future__ import annotations +import hashlib +import json import math import os.path import shutil @@ -73,6 +75,9 @@ class FileSystemItemABC(SimComponent): visible_status: FileSystemItemStatus = FileSystemItemStatus.GOOD "Visible status of the current FileSystemItem" + previous_hash: Optional[str] = None + "Hash of the file contents or the description state" + def describe_state(self) -> Dict: """ Produce a dictionary describing the current state of this object. @@ -102,7 +107,7 @@ class FileSystemItemABC(SimComponent): def _init_request_manager(self) -> RequestManager: am = super()._init_request_manager() am.add_request("scan", RequestType(func=lambda request, context: self.scan())) # TODO implement request - am.add_request("checkhash", RequestType(func=lambda request, context: ...)) # TODO implement request + am.add_request("checkhash", RequestType(func=lambda request, context: self.checkhash())) am.add_request("delete", RequestType(func=lambda request, context: ...)) # TODO implement request am.add_request("restore", RequestType(func=lambda request, context: ...)) # TODO implement request am.add_request("repair", RequestType(func=lambda request, context: self.repair())) @@ -115,6 +120,17 @@ class FileSystemItemABC(SimComponent): self.visible_status = self.status + @abstractmethod + def check_hash(self) -> bool: + """ + Checks the has of the file to detect any changes. + + For current implementation, any change in file hash means it is compromised. + + Return False if corruption is detected, otherwise True + """ + pass + @abstractmethod def repair(self) -> None: """Repair the FileSystemItem.""" @@ -517,6 +533,30 @@ class Folder(FileSystemItemABC): self.fs.sys_log.info(f"Corrupted folder {self.name}") + def check_hash(self) -> bool: + """ + Runs a :func:`check_hash` on all files in the folder. + + If a file under the folder is corrupted, the whole folder is considered corrupted. + + TODO: For now this will just iterate through the files and run :func:`check_hash` and ignores + any other changes to the folder + + Return False if corruption is detected, otherwise True + """ + # iterate through the files and run a check hash + no_corrupted_files = True + + for file_id in self.files: + file = self.get_file_by_id(file_uuid=file_id) + no_corrupted_files = file.check_hash() + + # if one file in the folder is corrupted, set the folder status to corrupted + if not no_corrupted_files: + self.corrupt() + + return no_corrupted_files + class File(FileSystemItemABC): """ @@ -629,3 +669,36 @@ class File(FileSystemItemABC): path = self.folder.name + "/" + self.name self.folder.fs.sys_log.info(f"Corrupted file {self.sim_path if self.sim_path else path}") + + def check_hash(self) -> bool: + """ + Check if the file has been changed. + + If changed, the file is considered corrupted. + + Return False if corruption is detected, otherwise True + """ + current_hash = None + + # if file is real, read the file contents + if self.real: + with open(self.sim_path, "rb") as f: + file_hash = hashlib.blake2b() + while chunk := f.read(8192): + file_hash.update(chunk) + + current_hash = file_hash.hexdigest() + else: + # otherwise get describe_state dict and hash that + current_hash = hashlib.blake2b(json.dumps(self.describe_state(), sort_keys=True).encode()).hexdigest() + + # if the previous hash is None, set the current hash to previous + if self.previous_hash is None: + self.previous_hash = current_hash + + # if the previous hash and current hash do not match, mark file as corrupted + if self.previous_hash is not current_hash: + self.corrupt() + return False + + return True diff --git a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py index 539f2874..5bb4ceda 100644 --- a/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py +++ b/tests/unit_tests/_primaite/_simulator/_file_system/test_file_system.py @@ -169,6 +169,60 @@ def test_folder_corrupt_repair(file_system): assert file.status == FileSystemItemStatus.GOOD +def test_simulated_file_check_hash(file_system): + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert file.check_hash() is True + + # change simulated file size + file.sim_size = 0 + assert file.check_hash() is False + assert file.status == FileSystemItemStatus.CORRUPTED + + +def test_real_file_check_hash(file_system): + file: File = file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True) + + assert file.check_hash() is True + + # change file content + with open(file.sim_path, "a") as f: + f.write("get hacked scrub lol xD\n") + + assert file.check_hash() is False + assert file.status == FileSystemItemStatus.CORRUPTED + + +def test_simulated_folder_check_hash(file_system): + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder") + + assert folder.check_hash() is True + + # change simulated file size + file = folder.get_file(file_name="test_file.txt") + file.sim_size = 0 + assert folder.check_hash() is False + assert folder.status == FileSystemItemStatus.CORRUPTED + + +def test_real_folder_check_hash(file_system): + folder: Folder = file_system.create_folder(folder_name="test_folder") + file_system.create_file(file_name="test_file.txt", folder_name="test_folder", real=True) + + assert folder.check_hash() is True + + # change simulated file size + file = folder.get_file(file_name="test_file.txt") + + # change file content + with open(file.sim_path, "a") as f: + f.write("get hacked scrub lol xD\n") + + assert folder.check_hash() is False + assert folder.status == FileSystemItemStatus.CORRUPTED + + @pytest.mark.skip(reason="Skipping until we tackle serialisation") def test_serialisation(file_system): """Test to check that the object serialisation works correctly."""