From 1e3e990aecb579bcca5d36c5a9e21c390a43f147 Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Mon, 19 May 2025 09:33:58 +0200 Subject: [PATCH] start porting python-nostr --- .../cryptography/hazmat/primitives/ciphers.py | 80 ++++++++++++ .../cryptography/hazmat/primitives/padding.py | 76 ++++++++++++ internal_filesystem/lib/dataclasses.py | 46 +++++++ internal_filesystem/lib/secrets.py | 114 ++++++++++++++++++ 4 files changed, 316 insertions(+) create mode 100644 internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py create mode 100644 internal_filesystem/lib/cryptography/hazmat/primitives/padding.py create mode 100644 internal_filesystem/lib/dataclasses.py create mode 100644 internal_filesystem/lib/secrets.py diff --git a/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py b/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py new file mode 100644 index 00000000..af43e941 --- /dev/null +++ b/internal_filesystem/lib/cryptography/hazmat/primitives/ciphers.py @@ -0,0 +1,80 @@ +# cipher.py: MicroPython compatibility layer for cryptography.hazmat.primitives.ciphers +# Implements Cipher, algorithms.AES, and modes.CBC using ucryptolib + +from ucryptolib import aes + +class Cipher: + """Emulates cryptography's Cipher for AES encryption/decryption.""" + def __init__(self, algorithm, mode): + self.algorithm = algorithm + self.mode = mode + self._key = algorithm.key + self._iv = mode.iv if mode.iv is not None else b'\x00' * 16 + self._cipher = aes(self._key, 1) # Mode 1 = CBC + + def encryptor(self): + return Encryptor(self._cipher, self._iv) + + def decryptor(self): + return Decryptor(self._cipher, self._iv) + +class Encryptor: + """Handles encryption with the initialized cipher.""" + def __init__(self, cipher, iv): + self._cipher = cipher + self._iv = iv + self._buffer = bytearray() + + def update(self, data): + self._buffer.extend(data) + # MicroPython's ucryptolib processes full blocks + block_size = 16 # AES block size + if len(self._buffer) >= block_size: + to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] + self._buffer = self._buffer[len(to_process):] + return self._cipher.encrypt(to_process) + return b'' + + def finalize(self): + if self._buffer: + # Pad remaining data if needed (handled by caller with PKCS7) + return self._cipher.encrypt(self._buffer) + return b'' + +class Decryptor: + """Handles decryption with the initialized cipher.""" + def __init__(self, cipher, iv): + self._cipher = cipher + self._iv = iv + self._buffer = bytearray() + + def update(self, data): + self._buffer.extend(data) + block_size = 16 + if len(self._buffer) >= block_size: + to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] + self._buffer = self._buffer[len(to_process):] + return self._cipher.decrypt(to_process) + return b'' + + def finalize(self): + if self._buffer: + return self._cipher.decrypt(self._buffer) + return b'' + +class algorithms: + """Namespace for cipher algorithms.""" + class AES: + def __init__(self, key): + if len(key) not in (16, 24, 32): # 128, 192, 256-bit keys + raise ValueError("AES key must be 16, 24, or 32 bytes") + self.key = key + self.block_size = 128 # Bits + +class modes: + """Namespace for cipher modes.""" + class CBC: + def __init__(self, iv): + if len(iv) != 16: + raise ValueError("CBC IV must be 16 bytes") + self.iv = iv diff --git a/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py b/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py new file mode 100644 index 00000000..ceae907f --- /dev/null +++ b/internal_filesystem/lib/cryptography/hazmat/primitives/padding.py @@ -0,0 +1,76 @@ +# primitives.py: MicroPython compatibility layer for cryptography.hazmat.primitives.padding +# Implements PKCS7 padding and unpadding + +def _byte_padding_check(block_size): + """Validate block size for padding.""" + if not (0 <= block_size <= 2040): + raise ValueError("block_size must be in range(0, 2041).") + if block_size % 8 != 0: + raise ValueError("block_size must be a multiple of 8.") + +class PKCS7PaddingContext: + """Handles PKCS7 padding.""" + def __init__(self, block_size): + _byte_padding_check(block_size) + self.block_size = block_size // 8 # Convert bits to bytes + self._buffer = bytearray() + + def update(self, data): + self._buffer.extend(data) + # Return full blocks + block_size = self.block_size + if len(self._buffer) >= block_size: + to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] + self._buffer = self._buffer[len(to_return):] + return to_return + return b'' + + def finalize(self): + # Pad with bytes equal to padding length + pad_length = self.block_size - (len(self._buffer) % self.block_size) + padding = bytes([pad_length] * pad_length) + self._buffer.extend(padding) + result = bytes(self._buffer) + self._buffer = bytearray() + return result + +class PKCS7UnpaddingContext: + """Handles PKCS7 unpadding.""" + def __init__(self, block_size): + _byte_padding_check(block_size) + self.block_size = block_size // 8 + self._buffer = bytearray() + + def update(self, data): + self._buffer.extend(data) + # Only process complete blocks + block_size = self.block_size + if len(self._buffer) >= block_size: + to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)] + self._buffer = self._buffer[len(to_return):] + return to_return + return b'' + + def finalize(self): + if not self._buffer or len(self._buffer) % self.block_size != 0: + raise ValueError("Invalid padding") + pad_length = self._buffer[-1] + if pad_length > self.block_size or pad_length == 0: + raise ValueError("Invalid padding") + if self._buffer[-pad_length:] != bytes([pad_length] * pad_length): + raise ValueError("Invalid padding") + result = bytes(self._buffer[:-pad_length]) + self._buffer = bytearray() + return result + +class PKCS7: + """PKCS7 padding implementation.""" + def __init__(self, block_size): + _byte_padding_check(block_size) + self.block_size = block_size + + def padder(self): + return PKCS7PaddingContext(self.block_size) + + def unpadder(self): + return PKCS7UnpaddingContext(self.block_size) diff --git a/internal_filesystem/lib/dataclasses.py b/internal_filesystem/lib/dataclasses.py new file mode 100644 index 00000000..04cf6e0d --- /dev/null +++ b/internal_filesystem/lib/dataclasses.py @@ -0,0 +1,46 @@ +# dataclasses.py: Minimal MicroPython compatibility layer for Python's dataclasses +# Implements @dataclass with __init__ and __repr__ generation + +def dataclass(cls): + """Decorator to emulate Python's @dataclass, generating __init__ and __repr__.""" + # Get class annotations and defaults + annotations = getattr(cls, '__annotations__', {}) + defaults = {} + for name in dir(cls): + if not name.startswith('__'): + attr = getattr(cls, name, None) + if not callable(attr) and name in annotations: + defaults[name] = attr + + # Generate __init__ method + def __init__(self, *args, **kwargs): + # Positional arguments + fields = list(annotations.keys()) + for i, value in enumerate(args): + if i >= len(fields): + raise TypeError(f"Too many positional arguments") + setattr(self, fields[i], value) + + # Keyword arguments and defaults + for name in fields: + if name in kwargs: + setattr(self, name, kwargs[name]) + elif not hasattr(self, name): + if name in defaults: + setattr(self, name, defaults[name]) + else: + raise TypeError(f"Missing required argument: {name}") + + # Generate __repr__ method + def __repr__(self): + fields = [ + f"{name}={getattr(self, name)!r}" + for name in annotations + ] + return f"{cls.__name__}({', '.join(fields)})" + + # Attach generated methods to class + setattr(cls, '__init__', __init__) + setattr(cls, '__repr__', __repr__) + + return cls diff --git a/internal_filesystem/lib/secrets.py b/internal_filesystem/lib/secrets.py new file mode 100644 index 00000000..29cf7e11 --- /dev/null +++ b/internal_filesystem/lib/secrets.py @@ -0,0 +1,114 @@ +# By PiggyOS + +# secrets.py: Compatibility layer for CPython's secrets module in MicroPython +# Uses urandom for cryptographically secure randomness +# Implements SystemRandom, choice, randbelow, randbits, token_bytes, token_hex, +# token_urlsafe, and compare_digest + +import urandom +import ubinascii +import uhashlib +import utime + +class SystemRandom: + """Emulates random.SystemRandom using MicroPython's urandom.""" + + def randrange(self, start, stop=None, step=1): + """Return a random int in range(start, stop[, step]).""" + if stop is None: + stop = start + start = 0 + if step != 1: + raise NotImplementedError("step != 1 not supported") + if start >= stop: + raise ValueError("empty range") + range_size = stop - start + return start + self._randbelow(range_size) + + def _randbelow(self, n): + """Return a random int in [0, n).""" + if n <= 0: + raise ValueError("exclusive_upper_bound must be positive") + k = (n.bit_length() + 7) // 8 # Bytes needed for n + r = 0 + while True: + r = int.from_bytes(self._getrandbytes(k), 'big') + if r < n: + return r + + def _getrandbytes(self, n): + """Return n random bytes.""" + return bytearray(urandom.getrandbits(8) for _ in range(n)) + + def choice(self, seq): + """Return a randomly chosen element from a non-empty sequence.""" + if not seq: + raise IndexError("cannot choose from an empty sequence") + return seq[self._randbelow(len(seq))] + + def randbits(self, k): + """Return a non-negative int with k random bits.""" + if k < 0: + raise ValueError("number of bits must be non-negative") + numbytes = (k + 7) // 8 + return int.from_bytes(self._getrandbytes(numbytes), 'big') >> (numbytes * 8 - k) + +# Instantiate SystemRandom for module-level functions +_sysrand = SystemRandom() + +def choice(seq): + """Return a randomly chosen element from a non-empty sequence.""" + return _sysrand.choice(seq) + +def randbelow(exclusive_upper_bound): + """Return a random int in [0, exclusive_upper_bound).""" + return _sysrand._randbelow(exclusive_upper_bound) + +def randbits(k): + """Return a non-negative int with k random bits.""" + return _sysrand.randbits(k) + +def token_bytes(nbytes=None): + """Return a random byte string of nbytes. Default is 32 bytes.""" + if nbytes is None: + nbytes = 32 + if nbytes < 0: + raise ValueError("number of bytes must be non-negative") + return _sysrand._getrandbytes(nbytes) + +def token_hex(nbytes=None): + """Return a random hex string of nbytes. Default is 32 bytes.""" + return ubinascii.hexlify(token_bytes(nbytes)).decode() + +def token_urlsafe(nbytes=None): + """Return a random URL-safe base64 string of nbytes. Default is 32 bytes.""" + if nbytes is None: + nbytes = 32 + if nbytes < 0: + raise ValueError("number of bytes must be non-negative") + # Base64 encoding: 4 chars per 3 bytes, so we need ceil(nbytes * 4/3) chars + # Generate enough bytes to ensure we have at least nbytes after encoding + raw_bytes = token_bytes(nbytes) + # Use URL-safe base64 encoding (replaces '+' with '-', '/' with '_') + encoded = ubinascii.b2a_base64(raw_bytes).decode().rstrip('\n=') + # Ensure length corresponds to nbytes (truncate if needed) + return encoded[:int(nbytes * 4 / 3)] + +def compare_digest(a, b): + """Return True if a and b are equal in constant time, else False.""" + # Convert to bytes if strings + if isinstance(a, str): + a = a.encode() + if isinstance(b, str): + b = b.encode() + if not isinstance(a, (bytes, bytearray)) or not isinstance(b, (bytes, bytearray)): + raise TypeError("both inputs must be bytes-like or strings") + if len(a) != len(b): + return False + # Constant-time comparison to prevent timing attacks + result = 0 + for x, y in zip(a, b): + result |= x ^ y + return result == 0 + +