diff --git a/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py b/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py index af43e941..025a7403 100644 --- a/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py +++ b/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py @@ -4,13 +4,20 @@ from ucryptolib import aes class Cipher: + """Emulates cryptography's Cipher for AES encryption/decryption.""" def __init__(self, algorithm, mode): self.algorithm = algorithm self.mode = mode self._key = algorithm.key self._iv = mode.iv if mode.iv is not None else b'\x00' * 16 - self._cipher = aes(self._key, 1) # Mode 1 = CBC + self.mode_cbc = 2 # CBC, include IV + #print(f"Cipher init: key length = {len(self._key)} bytes (AES-{len(self._key)*8}), IV = {self._iv.hex()}") + if len(self._key) not in (16, 24, 32): + raise ValueError(f"Invalid key length: {len(self._key)} bytes") + if len(self._iv) != 16: + raise ValueError(f"Invalid IV length: {len(self._iv)} bytes") + self._cipher = aes(self._key, self.mode_cbc, self._iv) def encryptor(self): return Encryptor(self._cipher, self._iv) @@ -27,17 +34,17 @@ class Encryptor: def update(self, data): self._buffer.extend(data) - # MicroPython's ucryptolib processes full blocks block_size = 16 # AES block size if len(self._buffer) >= block_size: to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] self._buffer = self._buffer[len(to_process):] + print(f"Encryptor.update processing: {to_process.hex()}") return self._cipher.encrypt(to_process) return b'' def finalize(self): if self._buffer: - # Pad remaining data if needed (handled by caller with PKCS7) + print(f"Encryptor.finalize processing: {self._buffer.hex()}") return self._cipher.encrypt(self._buffer) return b'' @@ -54,11 +61,13 @@ class Decryptor: if len(self._buffer) >= block_size: to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] self._buffer = self._buffer[len(to_process):] + #print(f"Decryptor.update processing: {to_process.hex()}") return self._cipher.decrypt(to_process) return b'' def finalize(self): if self._buffer: + #print(f"Decryptor.finalize processing: {self._buffer.hex()}") return self._cipher.decrypt(self._buffer) return b'' diff --git a/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py b/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py index ceae907f..17ae651f 100644 --- a/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py +++ b/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py @@ -2,40 +2,12 @@ # Implements PKCS7 padding and unpadding def _byte_padding_check(block_size): - """Validate block size for padding.""" if not (0 <= block_size <= 2040): raise ValueError("block_size must be in range(0, 2041).") if block_size % 8 != 0: raise ValueError("block_size must be a multiple of 8.") class PKCS7PaddingContext: - """Handles PKCS7 padding.""" - def __init__(self, block_size): - _byte_padding_check(block_size) - self.block_size = block_size // 8 # Convert bits to bytes - self._buffer = bytearray() - - def update(self, data): - self._buffer.extend(data) - # Return full blocks - block_size = self.block_size - if len(self._buffer) >= block_size: - to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] - self._buffer = self._buffer[len(to_return):] - return to_return - return b'' - - def finalize(self): - # Pad with bytes equal to padding length - pad_length = self.block_size - (len(self._buffer) % self.block_size) - padding = bytes([pad_length] * pad_length) - self._buffer.extend(padding) - result = bytes(self._buffer) - self._buffer = bytearray() - return result - -class PKCS7UnpaddingContext: - """Handles PKCS7 unpadding.""" def __init__(self, block_size): _byte_padding_check(block_size) self.block_size = block_size // 8 @@ -43,28 +15,59 @@ class PKCS7UnpaddingContext: def update(self, data): self._buffer.extend(data) - # Only process complete blocks block_size = self.block_size if len(self._buffer) >= block_size: to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] self._buffer = self._buffer[len(to_return):] + print(f"PKCS7PaddingContext.update returning: {to_return.hex()}") return to_return + print(f"PKCS7PaddingContext.update buffer: {self._buffer.hex()}") return b'' def finalize(self): - if not self._buffer or len(self._buffer) % self.block_size != 0: - raise ValueError("Invalid padding") - pad_length = self._buffer[-1] - if pad_length > self.block_size or pad_length == 0: - raise ValueError("Invalid padding") - if self._buffer[-pad_length:] != bytes([pad_length] * pad_length): - raise ValueError("Invalid padding") - result = bytes(self._buffer[:-pad_length]) + pad_length = self.block_size - (len(self._buffer) % self.block_size) + padding = bytes([pad_length] * pad_length) + self._buffer.extend(padding) + result = bytes(self._buffer) + print(f"PKCS7PaddingContext.finalize pad_length: {pad_length}, padding: {padding.hex()}, result: {result.hex()}") self._buffer = bytearray() return result +class PKCS7UnpaddingContext: + def __init__(self, block_size): + _byte_padding_check(block_size) + self.block_size = block_size // 8 + self._buffer = bytearray() + + def update(self, data): + self._buffer.extend(data) + print(f"unpadder self._buffer is {self._buffer.hex()}") + block_size = self.block_size + # Return all complete blocks except the last one + if len(self._buffer) >= block_size * 2: # At least two blocks + to_return = self._buffer[:len(self._buffer) - block_size] + self._buffer = self._buffer[len(to_return):] + print(f"unpadder self._buffer is now {self._buffer.hex()} and returning {to_return.hex()}") + return to_return + print(f"unpadder self._buffer retained: {self._buffer.hex()}") + return b'' + + def finalize(self): + print(f"unpadder finalize self._buffer: {self._buffer.hex()}") + if not self._buffer or len(self._buffer) % self.block_size != 0: + raise ValueError(f"Invalid padding A: buffer {self._buffer.hex()}, length {len(self._buffer)}, remainder {len(self._buffer) % self.block_size}") + pad_length = self._buffer[-1] + print(f"unpadder finalize pad_length: {pad_length}, last {pad_length} bytes: {self._buffer[-pad_length:].hex()}") + if pad_length > self.block_size or pad_length == 0: + raise ValueError(f"Invalid padding B: pad_length {pad_length}") + if self._buffer[-pad_length:] != bytes([pad_length] * pad_length): + raise ValueError(f"Invalid padding C: expected {pad_length} bytes of {pad_length:02x}, got {self._buffer[-pad_length:].hex()}") + result = bytes(self._buffer[:-pad_length]) + self._buffer = bytearray() + print(f"unpadder finalize result: {result.hex()}") + return result + class PKCS7: - """PKCS7 padding implementation.""" def __init__(self, block_size): _byte_padding_check(block_size) self.block_size = block_size