You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
wallet: add payment list
This commit is contained in:
@@ -0,0 +1,64 @@
|
||||
import uasyncio
|
||||
from collections import deque
|
||||
|
||||
class AsyncQueue:
|
||||
def __init__(self, maxlen=10): # Set a default maximum length
|
||||
self._queue = deque((), maxlen, True) # Initialize deque with specified maxlen
|
||||
self._event = uasyncio.Event() # Event for signaling when items are added
|
||||
|
||||
async def get(self, timeout=None):
|
||||
"""Get an item from the queue, waiting if empty until an item is available or timeout expires."""
|
||||
while not self._queue:
|
||||
if timeout is not None:
|
||||
try:
|
||||
await uasyncio.wait_for(self._event.wait(), timeout) # Wait for item or timeout
|
||||
except uasyncio.TimeoutError:
|
||||
raise Empty("Queue is empty and timed out")
|
||||
else:
|
||||
await self._event.wait() # Wait indefinitely for an item
|
||||
self._event.clear() # Clear event after waking up
|
||||
return self._queue.popleft() # Return the item
|
||||
|
||||
async def put(self, item):
|
||||
"""Put an item in the queue and signal waiting coroutines."""
|
||||
self._queue.append(item) # This will now work with proper maxlen
|
||||
self._event.set() # Signal that an item is available
|
||||
|
||||
def qsize(self):
|
||||
"""Return the current size of the queue."""
|
||||
return len(self._queue)
|
||||
|
||||
def empty(self):
|
||||
"""Return True if the queue is empty."""
|
||||
return len(self._queue) == 0
|
||||
|
||||
class Empty(Exception):
|
||||
"""Exception raised when queue is empty and non-blocking or timeout occurs."""
|
||||
pass
|
||||
|
||||
|
||||
import uasyncio
|
||||
#from async_queue import AsyncQueue, Empty # Assuming the above code is in async_queue.py
|
||||
|
||||
async def producer(queue):
|
||||
for i in range(5):
|
||||
print(f"Producing {i}")
|
||||
await queue.put(i)
|
||||
await uasyncio.sleep(1) # Simulate some delay
|
||||
|
||||
async def consumer(queue):
|
||||
while True:
|
||||
try:
|
||||
item = await queue.get(timeout=2.0) # Wait up to 2 seconds
|
||||
print(f"Consumed {item}")
|
||||
except Empty:
|
||||
print("Consumer timed out waiting for item")
|
||||
break
|
||||
|
||||
async def main():
|
||||
queue = AsyncQueue()
|
||||
# Run producer and consumer concurrently in the event loop
|
||||
await uasyncio.gather(producer(queue), consumer(queue))
|
||||
|
||||
# Run the event loop
|
||||
uasyncio.run(main())
|
||||
@@ -0,0 +1,25 @@
|
||||
import ucryptolib
|
||||
import os
|
||||
|
||||
key = os.urandom(32)
|
||||
iv = bytes.fromhex("cafc34a94307c35f8c8f736831713467")
|
||||
#iv = bytes.fromhex("cafc34a94307c35f8c8f736831713468") # changing the IV doesn't change the output!
|
||||
#iv = os.urandom(16)
|
||||
data = b'{"method":"get_balance","params":{}}'
|
||||
pad_length = 16 - (len(data) % 16)
|
||||
padded_data = data + bytes([pad_length] * pad_length)
|
||||
print(f"Test padded_data: {padded_data.hex()}")
|
||||
|
||||
cipher = ucryptolib.aes(key, 1, iv)
|
||||
ciphertext = cipher.encrypt(padded_data)
|
||||
print(f"Test ciphertext: {ciphertext.hex()}")
|
||||
|
||||
cipher = ucryptolib.aes(key, 1, iv)
|
||||
decrypted = cipher.decrypt(ciphertext)
|
||||
print(f"Test decrypted: {decrypted.hex()}")
|
||||
|
||||
pad_length = decrypted[-1]
|
||||
if decrypted[-pad_length:] != bytes([pad_length] * pad_length):
|
||||
print(f"Test failed: invalid padding, got {decrypted[-pad_length:].hex()}")
|
||||
else:
|
||||
print(f"Test passed: valid padding")
|
||||
@@ -0,0 +1,46 @@
|
||||
import ucryptolib
|
||||
import os
|
||||
import sys
|
||||
|
||||
print(f"MicroPython version: {sys.version}")
|
||||
print(f"Platform: {sys.platform}")
|
||||
|
||||
key = os.urandom(32)
|
||||
iv1 = bytes.fromhex("cafc34a94307c35f8c8f736831713467")
|
||||
iv2 = bytes.fromhex("cafc34a94307c35f8c8f736831713468")
|
||||
iv3 = os.urandom(16) # Random IV
|
||||
data = b'{"method":"get_balance","params":{}}'
|
||||
pad_length = 16 - (len(data) % 16)
|
||||
padded_data = data + bytes([pad_length] * pad_length)
|
||||
print(f"Test key: {key.hex()}")
|
||||
print(f"Test padded_data: {padded_data.hex()} (length: {len(padded_data)})")
|
||||
|
||||
mode_cbc = 2
|
||||
|
||||
# Test with IV1
|
||||
cipher1 = ucryptolib.aes(key, mode_cbc, iv1)
|
||||
ciphertext1 = cipher1.encrypt(padded_data)
|
||||
print(f"IV1: {iv1.hex()}, Ciphertext1: {ciphertext1.hex()}")
|
||||
|
||||
# Test with IV2
|
||||
cipher2 = ucryptolib.aes(key, mode_cbc, iv2)
|
||||
ciphertext2 = cipher2.encrypt(padded_data)
|
||||
print(f"IV2: {iv2.hex()}, Ciphertext2: {ciphertext2.hex()}")
|
||||
|
||||
# Test with IV3
|
||||
cipher3 = ucryptolib.aes(key, mode_cbc, iv3)
|
||||
ciphertext3 = cipher3.encrypt(padded_data)
|
||||
print(f"IV3: {iv3.hex()}, Ciphertext3: {ciphertext3.hex()}")
|
||||
|
||||
# Compare ciphertexts
|
||||
print(f"Ciphertext1 == Ciphertext2: {ciphertext1 == ciphertext2}")
|
||||
print(f"Ciphertext1 == Ciphertext3: {ciphertext1 == ciphertext3}")
|
||||
|
||||
# Verify decryption
|
||||
cipher_decrypt = ucryptolib.aes(key, 1, iv1)
|
||||
decrypted = cipher_decrypt.decrypt(ciphertext1)
|
||||
print(f"Decrypted with IV1: {decrypted.hex()}")
|
||||
if decrypted[-pad_length:] != bytes([pad_length] * pad_length):
|
||||
print(f"Test failed: invalid padding, got {decrypted[-pad_length:].hex()}")
|
||||
else:
|
||||
print(f"Test passed: valid padding")
|
||||
Reference in New Issue
Block a user