start porting python-nostr

This commit is contained in:
Thomas Farstrike
2025-05-19 09:33:58 +02:00
parent e63c9d4040
commit 1e3e990aec
4 changed files with 316 additions and 0 deletions
@@ -0,0 +1,80 @@
# cipher.py: MicroPython compatibility layer for cryptography.hazmat.primitives.ciphers
# Implements Cipher, algorithms.AES, and modes.CBC using ucryptolib
from ucryptolib import aes
class Cipher:
"""Emulates cryptography's Cipher for AES encryption/decryption."""
def __init__(self, algorithm, mode):
self.algorithm = algorithm
self.mode = mode
self._key = algorithm.key
self._iv = mode.iv if mode.iv is not None else b'\x00' * 16
self._cipher = aes(self._key, 1) # Mode 1 = CBC
def encryptor(self):
return Encryptor(self._cipher, self._iv)
def decryptor(self):
return Decryptor(self._cipher, self._iv)
class Encryptor:
"""Handles encryption with the initialized cipher."""
def __init__(self, cipher, iv):
self._cipher = cipher
self._iv = iv
self._buffer = bytearray()
def update(self, data):
self._buffer.extend(data)
# MicroPython's ucryptolib processes full blocks
block_size = 16 # AES block size
if len(self._buffer) >= block_size:
to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
self._buffer = self._buffer[len(to_process):]
return self._cipher.encrypt(to_process)
return b''
def finalize(self):
if self._buffer:
# Pad remaining data if needed (handled by caller with PKCS7)
return self._cipher.encrypt(self._buffer)
return b''
class Decryptor:
"""Handles decryption with the initialized cipher."""
def __init__(self, cipher, iv):
self._cipher = cipher
self._iv = iv
self._buffer = bytearray()
def update(self, data):
self._buffer.extend(data)
block_size = 16
if len(self._buffer) >= block_size:
to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
self._buffer = self._buffer[len(to_process):]
return self._cipher.decrypt(to_process)
return b''
def finalize(self):
if self._buffer:
return self._cipher.decrypt(self._buffer)
return b''
class algorithms:
"""Namespace for cipher algorithms."""
class AES:
def __init__(self, key):
if len(key) not in (16, 24, 32): # 128, 192, 256-bit keys
raise ValueError("AES key must be 16, 24, or 32 bytes")
self.key = key
self.block_size = 128 # Bits
class modes:
"""Namespace for cipher modes."""
class CBC:
def __init__(self, iv):
if len(iv) != 16:
raise ValueError("CBC IV must be 16 bytes")
self.iv = iv
@@ -0,0 +1,76 @@
# primitives.py: MicroPython compatibility layer for cryptography.hazmat.primitives.padding
# Implements PKCS7 padding and unpadding
def _byte_padding_check(block_size):
"""Validate block size for padding."""
if not (0 <= block_size <= 2040):
raise ValueError("block_size must be in range(0, 2041).")
if block_size % 8 != 0:
raise ValueError("block_size must be a multiple of 8.")
class PKCS7PaddingContext:
"""Handles PKCS7 padding."""
def __init__(self, block_size):
_byte_padding_check(block_size)
self.block_size = block_size // 8 # Convert bits to bytes
self._buffer = bytearray()
def update(self, data):
self._buffer.extend(data)
# Return full blocks
block_size = self.block_size
if len(self._buffer) >= block_size:
to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
self._buffer = self._buffer[len(to_return):]
return to_return
return b''
def finalize(self):
# Pad with bytes equal to padding length
pad_length = self.block_size - (len(self._buffer) % self.block_size)
padding = bytes([pad_length] * pad_length)
self._buffer.extend(padding)
result = bytes(self._buffer)
self._buffer = bytearray()
return result
class PKCS7UnpaddingContext:
"""Handles PKCS7 unpadding."""
def __init__(self, block_size):
_byte_padding_check(block_size)
self.block_size = block_size // 8
self._buffer = bytearray()
def update(self, data):
self._buffer.extend(data)
# Only process complete blocks
block_size = self.block_size
if len(self._buffer) >= block_size:
to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
self._buffer = self._buffer[len(to_return):]
return to_return
return b''
def finalize(self):
if not self._buffer or len(self._buffer) % self.block_size != 0:
raise ValueError("Invalid padding")
pad_length = self._buffer[-1]
if pad_length > self.block_size or pad_length == 0:
raise ValueError("Invalid padding")
if self._buffer[-pad_length:] != bytes([pad_length] * pad_length):
raise ValueError("Invalid padding")
result = bytes(self._buffer[:-pad_length])
self._buffer = bytearray()
return result
class PKCS7:
"""PKCS7 padding implementation."""
def __init__(self, block_size):
_byte_padding_check(block_size)
self.block_size = block_size
def padder(self):
return PKCS7PaddingContext(self.block_size)
def unpadder(self):
return PKCS7UnpaddingContext(self.block_size)
+46
View File
@@ -0,0 +1,46 @@
# dataclasses.py: Minimal MicroPython compatibility layer for Python's dataclasses
# Implements @dataclass with __init__ and __repr__ generation
def dataclass(cls):
"""Decorator to emulate Python's @dataclass, generating __init__ and __repr__."""
# Get class annotations and defaults
annotations = getattr(cls, '__annotations__', {})
defaults = {}
for name in dir(cls):
if not name.startswith('__'):
attr = getattr(cls, name, None)
if not callable(attr) and name in annotations:
defaults[name] = attr
# Generate __init__ method
def __init__(self, *args, **kwargs):
# Positional arguments
fields = list(annotations.keys())
for i, value in enumerate(args):
if i >= len(fields):
raise TypeError(f"Too many positional arguments")
setattr(self, fields[i], value)
# Keyword arguments and defaults
for name in fields:
if name in kwargs:
setattr(self, name, kwargs[name])
elif not hasattr(self, name):
if name in defaults:
setattr(self, name, defaults[name])
else:
raise TypeError(f"Missing required argument: {name}")
# Generate __repr__ method
def __repr__(self):
fields = [
f"{name}={getattr(self, name)!r}"
for name in annotations
]
return f"{cls.__name__}({', '.join(fields)})"
# Attach generated methods to class
setattr(cls, '__init__', __init__)
setattr(cls, '__repr__', __repr__)
return cls
+114
View File
@@ -0,0 +1,114 @@
# By PiggyOS
# secrets.py: Compatibility layer for CPython's secrets module in MicroPython
# Uses urandom for cryptographically secure randomness
# Implements SystemRandom, choice, randbelow, randbits, token_bytes, token_hex,
# token_urlsafe, and compare_digest
import urandom
import ubinascii
import uhashlib
import utime
class SystemRandom:
"""Emulates random.SystemRandom using MicroPython's urandom."""
def randrange(self, start, stop=None, step=1):
"""Return a random int in range(start, stop[, step])."""
if stop is None:
stop = start
start = 0
if step != 1:
raise NotImplementedError("step != 1 not supported")
if start >= stop:
raise ValueError("empty range")
range_size = stop - start
return start + self._randbelow(range_size)
def _randbelow(self, n):
"""Return a random int in [0, n)."""
if n <= 0:
raise ValueError("exclusive_upper_bound must be positive")
k = (n.bit_length() + 7) // 8 # Bytes needed for n
r = 0
while True:
r = int.from_bytes(self._getrandbytes(k), 'big')
if r < n:
return r
def _getrandbytes(self, n):
"""Return n random bytes."""
return bytearray(urandom.getrandbits(8) for _ in range(n))
def choice(self, seq):
"""Return a randomly chosen element from a non-empty sequence."""
if not seq:
raise IndexError("cannot choose from an empty sequence")
return seq[self._randbelow(len(seq))]
def randbits(self, k):
"""Return a non-negative int with k random bits."""
if k < 0:
raise ValueError("number of bits must be non-negative")
numbytes = (k + 7) // 8
return int.from_bytes(self._getrandbytes(numbytes), 'big') >> (numbytes * 8 - k)
# Instantiate SystemRandom for module-level functions
_sysrand = SystemRandom()
def choice(seq):
"""Return a randomly chosen element from a non-empty sequence."""
return _sysrand.choice(seq)
def randbelow(exclusive_upper_bound):
"""Return a random int in [0, exclusive_upper_bound)."""
return _sysrand._randbelow(exclusive_upper_bound)
def randbits(k):
"""Return a non-negative int with k random bits."""
return _sysrand.randbits(k)
def token_bytes(nbytes=None):
"""Return a random byte string of nbytes. Default is 32 bytes."""
if nbytes is None:
nbytes = 32
if nbytes < 0:
raise ValueError("number of bytes must be non-negative")
return _sysrand._getrandbytes(nbytes)
def token_hex(nbytes=None):
"""Return a random hex string of nbytes. Default is 32 bytes."""
return ubinascii.hexlify(token_bytes(nbytes)).decode()
def token_urlsafe(nbytes=None):
"""Return a random URL-safe base64 string of nbytes. Default is 32 bytes."""
if nbytes is None:
nbytes = 32
if nbytes < 0:
raise ValueError("number of bytes must be non-negative")
# Base64 encoding: 4 chars per 3 bytes, so we need ceil(nbytes * 4/3) chars
# Generate enough bytes to ensure we have at least nbytes after encoding
raw_bytes = token_bytes(nbytes)
# Use URL-safe base64 encoding (replaces '+' with '-', '/' with '_')
encoded = ubinascii.b2a_base64(raw_bytes).decode().rstrip('\n=')
# Ensure length corresponds to nbytes (truncate if needed)
return encoded[:int(nbytes * 4 / 3)]
def compare_digest(a, b):
"""Return True if a and b are equal in constant time, else False."""
# Convert to bytes if strings
if isinstance(a, str):
a = a.encode()
if isinstance(b, str):
b = b.encode()
if not isinstance(a, (bytes, bytearray)) or not isinstance(b, (bytes, bytearray)):
raise TypeError("both inputs must be bytes-like or strings")
if len(a) != len(b):
return False
# Constant-time comparison to prevent timing attacks
result = 0
for x, y in zip(a, b):
result |= x ^ y
return result == 0