Files
MicroPythonOS/tests/manual_test_nostr_asyncio.py
2026-01-25 00:19:38 +01:00

332 lines
14 KiB
Python

import asyncio
import json
import ssl
import _thread
import time
import unittest
from mpos import App, AppManager
from nostr.relay_manager import RelayManager
from nostr.message_type import ClientMessageType
from nostr.filter import Filter, Filters
from nostr.event import EncryptedDirectMessage
from nostr.key import PrivateKey
# keeps a list of items
# The .add() method ensures the list remains unique (via __eq__)
# and sorted (via __lt__) by inserting new items in the correct position.
class UniqueSortedList:
def __init__(self):
self._items = []
def add(self, item):
#print(f"before add: {str(self)}")
# Check if item already exists (using __eq__)
if item not in self._items:
# Insert item in sorted position for descending order (using __gt__)
for i, existing_item in enumerate(self._items):
if item > existing_item:
self._items.insert(i, item)
return
# If item is smaller than all existing items, append it
self._items.append(item)
#print(f"after add: {str(self)}")
def __iter__(self):
# Return iterator for the internal list
return iter(self._items)
def get(self, index_nr):
# Retrieve item at given index, raise IndexError if invalid
try:
return self._items[index_nr]
except IndexError:
raise IndexError("Index out of range")
def __len__(self):
# Return the number of items for len() calls
return len(self._items)
def __str__(self):
#print("UniqueSortedList tostring called")
return "\n".join(str(item) for item in self._items)
def __eq__(self, other):
if len(self._items) != len(other):
return False
return all(p1 == p2 for p1, p2 in zip(self._items, other))
# Payment class remains unchanged
class Payment:
def __init__(self, epoch_time, amount_sats, comment):
self.epoch_time = epoch_time
self.amount_sats = amount_sats
self.comment = comment
def __str__(self):
sattext = "sats"
if self.amount_sats == 1:
sattext = "sat"
#return f"{self.amount_sats} {sattext} @ {self.epoch_time}: {self.comment}"
return f"{self.amount_sats} {sattext}: {self.comment}"
def __eq__(self, other):
if not isinstance(other, Payment):
return False
return self.epoch_time == other.epoch_time and self.amount_sats == other.amount_sats and self.comment == other.comment
def __lt__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) < (other.epoch_time, other.amount_sats, other.comment)
def __le__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) <= (other.epoch_time, other.amount_sats, other.comment)
def __gt__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) > (other.epoch_time, other.amount_sats, other.comment)
def __ge__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) >= (other.epoch_time, other.amount_sats, other.comment)
class TestNostr(unittest.TestCase):
PAYMENTS_TO_SHOW = 5
keep_running = None
connected = None
balance = -1
payment_list = []
transactions_welcome = False
relays = [ "ws://192.168.1.16:5000/nostrrelay/test", "ws://192.168.1.16:5000/nostrclient/api/v1/relay" ]
#relays = [ "ws://127.0.0.1:5000/nostrrelay/test", "ws://127.0.0.1:5000/nostrclient/api/v1/relay" ]
#relays = [ "wss://relay.damus.io", "wss://nostr-pub.wellorder.net" ]
#relays = [ "ws://127.0.0.1:5000/nostrrelay/test", "ws://127.0.0.1:5000/nostrclient/api/v1/relay", "wss://relay.damus.io", "wss://nostr-pub.wellorder.net" ]
#relays = [ "ws://127.0.0.1:5000/nostrclient/api/v1/relay", "wss://relay.damus.io", "wss://nostr-pub.wellorder.net" ]
secret = "fab0a9a11d4cf4b1d92e901a0b2c56634275e2fa1a7eb396ff1b942f95d59fd3" # not really a secret, just from a local fake wallet
wallet_pubkey = "e46762afab282c324278351165122345f9983ea447b47943b052100321227571"
async def fetch_balance(self):
if not self.keep_running:
return
# Create get_balance request
balance_request = {
"method": "get_balance",
"params": {}
}
print(f"DEBUG: Created balance request: {balance_request}")
print(f"DEBUG: Creating encrypted DM to wallet pubkey: {self.wallet_pubkey}")
dm = EncryptedDirectMessage(
recipient_pubkey=self.wallet_pubkey,
cleartext_content=json.dumps(balance_request),
kind=23194
)
print(f"DEBUG: Signing DM {json.dumps(dm)} with private key")
self.private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm
print(f"DEBUG: Publishing encrypted DM")
self.relay_manager.publish_event(dm)
def handle_new_balance(self, new_balance, fetchPaymentsIfChanged=True):
if not self.keep_running or new_balance is None:
return
if fetchPaymentsIfChanged: # Fetching *all* payments isn't necessary if balance was changed by a payment notification
print("Refreshing payments...")
self.fetch_payments() # if the balance changed, then re-list transactions
def fetch_payments(self):
if not self.keep_running:
return
# Create get_balance request
list_transactions = {
"method": "list_transactions",
"params": {
"limit": self.PAYMENTS_TO_SHOW
}
}
dm = EncryptedDirectMessage(
recipient_pubkey=self.wallet_pubkey,
cleartext_content=json.dumps(list_transactions),
kind=23194
)
self.private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm
print("\nPublishing DM to fetch payments...")
self.relay_manager.publish_event(dm)
self.transactions_welcome = True
def handle_new_payments(self, new_payments):
if not self.keep_running or not self.transactions_welcome:
return
print("handle_new_payments")
if self.payment_list != new_payments:
print("new list of payments")
self.payment_list = new_payments
self.payments_updated_cb()
def payments_updated_cb(self):
print("payments_updated_cb called, now closing everything!")
self.keep_running = False
def getCommentFromTransaction(self, transaction):
comment = ""
try:
comment = transaction["description"]
json_comment = json.loads(comment)
for field in json_comment:
if field[0] == "text/plain":
comment = field[1]
break
else:
print("text/plain field is missing from JSON description")
except Exception as e:
print(f"Info: could not parse comment as JSON, this is fine, using as-is ({e})")
return comment
async def NOmainHERE(self):
self.keep_running = True
self.private_key = PrivateKey(bytes.fromhex(self.secret))
self.relay_manager = RelayManager()
for relay in self.relays:
self.relay_manager.add_relay(relay)
print(f"DEBUG: Opening relay connections")
await self.relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE})
self.allconnected = False
for _ in range(20):
print("Waiting for relay connection...")
await asyncio.sleep(0.5)
nrconnected = 0
for index, relay in enumerate(self.relays):
try:
relay = self.relay_manager.relays[self.relays[index]]
if relay.connected is True:
print(f"connected: {self.relays[index]}")
nrconnected += 1
else:
print(f"not connected: {self.relays[index]}")
except Exception as e:
print(f"could not find relay: {e}")
break # not all of them have been initialized, skip...
self.allconnected = ( nrconnected == len(self.relays) )
if self.allconnected:
print("All relays connected!")
break
if not self.allconnected or not self.keep_running:
print(f"ERROR: could not connect to relay or not self.keep_running, aborting...")
return
# Set up subscription to receive response
self.subscription_id = "micropython_nwc_" + str(round(time.time()))
print(f"DEBUG: Setting up subscription with ID: {self.subscription_id}")
self.filters = Filters([Filter(
#event_ids=[self.subscription_id], # would be nice to filter, but not like this
kinds=[23195, 23196], # NWC reponses and notifications
authors=[self.wallet_pubkey],
pubkey_refs=[self.private_key.public_key.hex()]
)])
print(f"DEBUG: Subscription filters: {self.filters.to_json_array()}")
self.relay_manager.add_subscription(self.subscription_id, self.filters)
print(f"DEBUG: Creating subscription request")
request_message = [ClientMessageType.REQUEST, self.subscription_id]
request_message.extend(self.filters.to_json_array())
print(f"DEBUG: Publishing subscription request")
self.relay_manager.publish_message(json.dumps(request_message))
print(f"DEBUG: Published subscription request")
for _ in range(4):
if not self.keep_running:
return
print("Waiting a bit before self.fetch_balance()")
await asyncio.sleep(0.5)
await self.fetch_balance()
while True:
print(f"checking for incoming events...")
await asyncio.sleep(1)
if not self.keep_running:
print("NWCWallet: not keep_running, closing connections...")
await self.relay_manager.close_connections()
break
start_time = time.ticks_ms()
if self.relay_manager.message_pool.has_events():
print(f"DEBUG: Event received from message pool after {time.ticks_ms()-start_time}ms")
event_msg = self.relay_manager.message_pool.get_event()
event_created_at = event_msg.event.created_at
print(f"Received at {time.localtime()} a message with timestamp {event_created_at} after {time.ticks_ms()-start_time}ms")
try:
# This takes a very long time, even for short messages:
decrypted_content = self.private_key.decrypt_message(
event_msg.event.content,
event_msg.event.public_key,
)
print(f"DEBUG: Decrypted content: {decrypted_content} after {time.ticks_ms()-start_time}ms")
response = json.loads(decrypted_content)
print(f"DEBUG: Parsed response: {response}")
result = response.get("result")
if result:
if result.get("balance") is not None:
new_balance = round(int(result["balance"]) / 1000)
print(f"Got balance: {new_balance}")
self.handle_new_balance(new_balance)
elif result.get("transactions") is not None:
print("Response contains transactions!")
new_payment_list = UniqueSortedList()
for transaction in result["transactions"]:
amount = transaction["amount"]
amount = round(amount / 1000)
comment = self.getCommentFromTransaction(transaction)
epoch_time = transaction["created_at"]
paymentObj = Payment(epoch_time, amount, comment)
new_payment_list.add(paymentObj)
if len(new_payment_list) > 0:
# do them all in one shot instead of one-by-one because the lv_async() isn't always chronological,
# so when a long list of payments is added, it may be overwritten by a short list
self.handle_new_payments(new_payment_list)
else:
notification = response.get("notification")
if notification:
amount = notification["amount"]
amount = round(amount / 1000)
type = notification["type"]
if type == "outgoing":
amount = -amount
elif type == "incoming":
new_balance = self.last_known_balance + amount
self.handle_new_balance(new_balance, False) # don't trigger full fetch because payment info is in notification
epoch_time = notification["created_at"]
comment = self.getCommentFromTransaction(notification)
paymentObj = Payment(epoch_time, amount, comment)
self.handle_new_payment(paymentObj)
else:
print(f"WARNING: invalid notification type {type}, ignoring.")
else:
print("Unsupported response, ignoring.")
except Exception as e:
print(f"DEBUG: Error processing response: {e}")
else:
#print(f"pool has no events after {time.ticks_ms()-start_time}ms") # completes in 0-1ms
pass
def test_it(self):
print("before do_two")
asyncio.run(self.do_two())
print("after do_two")
def do_two(self):
print("before await self.NOmainHERE()")
await self.NOmainHERE()
print("after await self.NOmainHERE()")