Add com.micropythonos.nostr

Initial commit, not ready for release.
This commit is contained in:
Thomas Farstrike
2026-01-16 18:39:53 +01:00
parent 7fd86daeda
commit 36cc20bf45
14 changed files with 814 additions and 0 deletions
@@ -0,0 +1,23 @@
{
"name": "Nostr",
"publisher": "MicroPythonOS",
"short_description": "Nostr",
"long_description": "Notest and Other Stuff Transmitted by Relays",
"icon_url": "https://apps.micropythonos.com/apps/com.micropythonos.nostr/icons/com.micropythonos.nostr_0.1.0_64x64.png",
"download_url": "https://apps.micropythonos.com/apps/com.micropythonos.nostr/mpks/com.micropythonos.nostr_0.1.0.mpk",
"fullname": "com.micropythonos.nostr",
"version": "0.1.0",
"category": "communication",
"activities": [
{
"entrypoint": "assets/nostr.py",
"classname": "Nostr",
"intent_filters": [
{
"action": "main",
"category": "launcher"
}
]
}
]
}
@@ -0,0 +1,182 @@
import time
import random
import lvgl as lv
import mpos.ui
class Confetti:
"""Manages confetti animation with physics simulation."""
def __init__(self, screen, icon_path, asset_path, duration=10000):
"""
Initialize the Confetti system.
Args:
screen: The LVGL screen/display object
icon_path: Path to icon assets (e.g., "M:apps/com.lightningpiggy.displaywallet/res/mipmap-mdpi/")
asset_path: Path to confetti assets (e.g., "M:apps/com.lightningpiggy.displaywallet/res/drawable-mdpi/")
max_confetti: Maximum number of confetti pieces to display
"""
self.screen = screen
self.icon_path = icon_path
self.asset_path = asset_path
self.duration = duration
self.max_confetti = 21
# Physics constants
self.GRAVITY = 100 # pixels/sec²
# Screen dimensions
self.screen_width = screen.get_display().get_horizontal_resolution()
self.screen_height = screen.get_display().get_vertical_resolution()
# State
self.is_running = False
self.last_time = time.ticks_ms()
self.confetti_pieces = []
self.confetti_images = []
self.used_img_indices = set()
# Spawn control
self.spawn_timer = 0
self.spawn_interval = 0.15 # seconds
self.animation_start = 0
# Pre-create LVGL image objects
self._init_images()
def _init_images(self):
"""Pre-create LVGL image objects for confetti."""
iconimages = 2
for _ in range(iconimages):
img = lv.image(lv.layer_top())
img.set_src(f"{self.icon_path}icon_64x64.png")
img.add_flag(lv.obj.FLAG.HIDDEN)
self.confetti_images.append(img)
for i in range(self.max_confetti - iconimages):
img = lv.image(lv.layer_top())
img.set_src(f"{self.asset_path}confetti{random.randint(0, 4)}.png")
img.add_flag(lv.obj.FLAG.HIDDEN)
self.confetti_images.append(img)
def start(self):
"""Start the confetti animation."""
if self.is_running:
return
self.is_running = True
self.last_time = time.ticks_ms()
self._clear_confetti()
# Staggered spawn control
self.spawn_timer = 0
self.animation_start = time.ticks_ms() / 1000.0
# Initial burst
for _ in range(10):
self._spawn_one()
# Register update callback
mpos.ui.task_handler.add_event_cb(self._update_frame, 1)
# Stop spawning after 15 seconds
lv.timer_create(self.stop, self.duration, None).set_repeat_count(1)
def stop(self, timer=None):
"""Stop the confetti animation."""
self.is_running = False
def _clear_confetti(self):
"""Clear all confetti pieces from the screen."""
for img in self.confetti_images:
img.add_flag(lv.obj.FLAG.HIDDEN)
self.confetti_pieces = []
self.used_img_indices.clear()
def _update_frame(self, a, b):
"""Update frame for confetti animation. Called by task handler."""
current_time = time.ticks_ms()
delta_time = time.ticks_diff(current_time, self.last_time) / 1000.0
self.last_time = current_time
# === STAGGERED SPAWNING ===
if self.is_running:
self.spawn_timer += delta_time
if self.spawn_timer >= self.spawn_interval:
self.spawn_timer = 0
for _ in range(random.randint(1, 2)):
if len(self.confetti_pieces) < self.max_confetti:
self._spawn_one()
# === UPDATE ALL PIECES ===
new_pieces = []
for piece in self.confetti_pieces:
# Physics
piece['age'] += delta_time
piece['x'] += piece['vx'] * delta_time
piece['y'] += piece['vy'] * delta_time
piece['vy'] += self.GRAVITY * delta_time
piece['rotation'] += piece['spin'] * delta_time
piece['scale'] = max(0.3, 1.0 - (piece['age'] / piece['lifetime']) * 0.7)
# Render
img = self.confetti_images[piece['img_idx']]
img.remove_flag(lv.obj.FLAG.HIDDEN)
img.set_pos(int(piece['x']), int(piece['y']))
img.set_rotation(int(piece['rotation'] * 10))
orig = img.get_width()
if orig >= 64:
img.set_scale(int(256 * piece['scale'] / 1.5))
elif orig < 32:
img.set_scale(int(256 * piece['scale'] * 1.5))
else:
img.set_scale(int(256 * piece['scale']))
# Death check
dead = (
piece['x'] < -60 or piece['x'] > self.screen_width + 60 or
piece['y'] > self.screen_height + 60 or
piece['age'] > piece['lifetime']
)
if dead:
img.add_flag(lv.obj.FLAG.HIDDEN)
self.used_img_indices.discard(piece['img_idx'])
else:
new_pieces.append(piece)
self.confetti_pieces = new_pieces
# Full stop when empty and paused
if not self.confetti_pieces and not self.is_running:
print("Confetti finished")
mpos.ui.task_handler.remove_event_cb(self._update_frame)
def _spawn_one(self):
"""Spawn a single confetti piece."""
if not self.is_running:
return
# Find a free image slot
for idx, img in enumerate(self.confetti_images):
if img.has_flag(lv.obj.FLAG.HIDDEN) and idx not in self.used_img_indices:
break
else:
return # No free slot
piece = {
'img_idx': idx,
'x': random.uniform(-50, self.screen_width + 50),
'y': random.uniform(50, 100), # Start above screen
'vx': random.uniform(-80, 80),
'vy': random.uniform(-150, 0),
'spin': random.uniform(-500, 500),
'age': 0.0,
'lifetime': random.uniform(5.0, 10.0), # Long enough to fill 10s
'rotation': random.uniform(0, 360),
'scale': 1.0
}
self.confetti_pieces.append(piece)
self.used_img_indices.add(idx)
@@ -0,0 +1,22 @@
import lvgl as lv
from mpos import Activity, min_resolution
class FullscreenQR(Activity):
# No __init__() so super.__init__() will be called automatically
def onCreate(self):
receive_qr_data = self.getIntent().extras.get("receive_qr_data")
qr_screen = lv.obj()
qr_screen.set_scrollbar_mode(lv.SCROLLBAR_MODE.OFF)
qr_screen.set_scroll_dir(lv.DIR.NONE)
qr_screen.add_event_cb(lambda e: self.finish(),lv.EVENT.CLICKED,None)
big_receive_qr = lv.qrcode(qr_screen)
big_receive_qr.set_size(min_resolution())
big_receive_qr.set_dark_color(lv.color_black())
big_receive_qr.set_light_color(lv.color_white())
big_receive_qr.center()
big_receive_qr.set_style_border_color(lv.color_white(), 0)
big_receive_qr.set_style_border_width(0, 0);
big_receive_qr.update(receive_qr_data, len(receive_qr_data))
self.setContentView(qr_screen)
@@ -0,0 +1,219 @@
import lvgl as lv
from mpos import Activity, Intent, ConnectivityManager, MposKeyboard, pct_of_display_width, pct_of_display_height, SharedPreferences, SettingsActivity
from mpos.ui.anim import WidgetAnimator
from fullscreen_qr import FullscreenQR
class Nostr(Activity):
wallet = None
receive_qr_data = None
destination = None
balance_mode = 0 # 0=sats, 1=bits, 2=μBTC, 3=mBTC, 4=BTC
payments_label_current_font = 2
payments_label_fonts = [ lv.font_montserrat_10, lv.font_unscii_8, lv.font_montserrat_16, lv.font_montserrat_24, lv.font_unscii_16, lv.font_montserrat_28_compressed, lv.font_montserrat_40]
# screens:
main_screen = None
# widgets
balance_label = None
receive_qr = None
payments_label = None
# activities
fullscreenqr = FullscreenQR() # need a reference to be able to finish() it
def onCreate(self):
self.prefs = SharedPreferences("com.micropythonos.nostr")
self.main_screen = lv.obj()
self.main_screen.set_style_pad_all(10, 0)
# This line needs to be drawn first, otherwise it's over the balance label and steals all the clicks!
balance_line = lv.line(self.main_screen)
balance_line.set_points([{'x':0,'y':35},{'x':200,'y':35}],2)
balance_line.add_flag(lv.obj.FLAG.CLICKABLE)
self.balance_label = lv.label(self.main_screen)
self.balance_label.set_text("")
self.balance_label.align(lv.ALIGN.TOP_LEFT, 0, 0)
self.balance_label.set_style_text_font(lv.font_montserrat_24, 0)
self.balance_label.add_flag(lv.obj.FLAG.CLICKABLE)
self.balance_label.set_width(pct_of_display_width(75)) # 100 - receive_qr
self.balance_label.add_event_cb(self.balance_label_clicked_cb,lv.EVENT.CLICKED,None)
self.receive_qr = lv.qrcode(self.main_screen)
self.receive_qr.set_size(pct_of_display_width(20)) # bigger QR results in simpler code (less error correction?)
self.receive_qr.set_dark_color(lv.color_black())
self.receive_qr.set_light_color(lv.color_white())
self.receive_qr.align(lv.ALIGN.TOP_RIGHT,0,0)
self.receive_qr.set_style_border_color(lv.color_white(), 0)
self.receive_qr.set_style_border_width(1, 0);
self.receive_qr.add_flag(lv.obj.FLAG.CLICKABLE)
self.receive_qr.add_event_cb(self.qr_clicked_cb,lv.EVENT.CLICKED,None)
self.payments_label = lv.label(self.main_screen)
self.payments_label.set_text("")
self.payments_label.align_to(balance_line,lv.ALIGN.OUT_BOTTOM_LEFT,0,10)
self.update_payments_label_font()
self.payments_label.set_width(pct_of_display_width(75)) # 100 - receive_qr
self.payments_label.add_flag(lv.obj.FLAG.CLICKABLE)
self.payments_label.add_event_cb(self.payments_label_clicked,lv.EVENT.CLICKED,None)
settings_button = lv.button(self.main_screen)
settings_button.set_size(lv.pct(20), lv.pct(25))
settings_button.align(lv.ALIGN.BOTTOM_RIGHT, 0, 0)
settings_button.add_event_cb(self.settings_button_tap,lv.EVENT.CLICKED,None)
settings_label = lv.label(settings_button)
settings_label.set_text(lv.SYMBOL.SETTINGS)
settings_label.set_style_text_font(lv.font_montserrat_24, 0)
settings_label.center()
if False: # send button disabled for now, not implemented
send_button = lv.button(self.main_screen)
send_button.set_size(lv.pct(20), lv.pct(25))
send_button.align_to(settings_button, lv.ALIGN.OUT_TOP_MID, 0, -pct_of_display_height(2))
send_button.add_event_cb(self.send_button_tap,lv.EVENT.CLICKED,None)
send_label = lv.label(send_button)
send_label.set_text(lv.SYMBOL.UPLOAD)
send_label.set_style_text_font(lv.font_montserrat_24, 0)
send_label.center()
self.setContentView(self.main_screen)
def onStart(self, main_screen):
self.main_ui_set_defaults()
def onResume(self, main_screen):
super().onResume(main_screen)
cm = ConnectivityManager.get()
cm.register_callback(self.network_changed)
self.network_changed(cm.is_online())
def onPause(self, main_screen):
if self.wallet and self.destination != FullscreenQR:
self.wallet.stop() # don't stop the wallet for the fullscreen QR activity
self.destination = None
cm = ConnectivityManager.get()
cm.unregister_callback(self.network_changed)
def network_changed(self, online):
print("displaywallet.py network_changed, now:", "ONLINE" if online else "OFFLINE")
if online:
self.went_online()
else:
self.went_offline()
def went_online(self):
if self.wallet and self.wallet.is_running():
print("wallet is already running, nothing to do") # might have come from the QR activity
return
try:
from nwc_wallet import NWCWallet
self.wallet = NWCWallet(self.prefs.get_string("nwc_url"))
self.wallet.static_receive_code = self.prefs.get_string("nwc_static_receive_code")
self.redraw_static_receive_code_cb()
except Exception as e:
self.error_cb(f"Couldn't initialize NWC Wallet because: {e}")
return
self.balance_label.set_text(lv.SYMBOL.REFRESH)
self.payments_label.set_text(f"\nConnecting to {wallet_type} backend.\n\nIf this takes too long, it might be down or something's wrong with the settings.")
# by now, self.wallet can be assumed
self.wallet.start(self.balance_updated_cb, self.redraw_payments_cb, self.redraw_static_receive_code_cb, self.error_cb)
def went_offline(self):
if self.wallet:
self.wallet.stop() # don't stop the wallet for the fullscreen QR activity
self.payments_label.set_text(f"WiFi is not connected, can't talk to wallet...")
def update_payments_label_font(self):
self.payments_label.set_style_text_font(self.payments_label_fonts[self.payments_label_current_font], 0)
def payments_label_clicked(self, event):
self.payments_label_current_font = (self.payments_label_current_font + 1) % len(self.payments_label_fonts)
self.update_payments_label_font()
def float_to_string(self, value):
# Format float to string with fixed-point notation, up to 6 decimal places
s = "{:.8f}".format(value)
# Remove trailing zeros and decimal point if no decimals remain
return s.rstrip("0").rstrip(".")
def display_balance(self, balance):
#print(f"displaying balance {balance}")
if self.balance_mode == 0: # sats
#balance_text = "丰 " + str(balance) # font doesnt support it
balance_text = str(balance) + " sat"
if balance > 1:
balance_text += "s"
elif self.balance_mode == 1: # bits (1 bit = 100 sats)
balance_bits = balance / 100
balance_text = self.float_to_string(balance_bits) + " bit"
if balance_bits != 1:
balance_text += "s"
elif self.balance_mode == 2: # micro-BTC (1 μBTC = 100 sats)
balance_ubtc = balance / 100
balance_text = self.float_to_string(balance_ubtc) + " micro-BTC"
elif self.balance_mode == 3: # milli-BTC (1 mBTC = 100000 sats)
balance_mbtc = balance / 100000
balance_text = self.float_to_string(balance_mbtc) + " milli-BTC"
elif self.balance_mode == 4: # BTC (1 BTC = 100000000 sats)
balance_btc = balance / 100000000
#balance_text = "₿ " + str(balance) # font doesnt support it - although it should https://fonts.google.com/specimen/Montserrat
balance_text = self.float_to_string(balance_btc) + " BTC"
self.balance_label.set_text(balance_text)
#print("done displaying balance")
def balance_updated_cb(self, sats_added=0):
print(f"balance_updated_cb(sats_added={sats_added})")
if self.fullscreenqr.has_foreground():
self.fullscreenqr.finish()
balance = self.wallet.last_known_balance
print(f"balance: {balance}")
def redraw_payments_cb(self):
# this gets called from another thread (the wallet) so make sure it happens in the LVGL thread using lv.async_call():
self.payments_label.set_text(str(self.wallet.payment_list))
def redraw_static_receive_code_cb(self):
# this gets called from another thread (the wallet) so make sure it happens in the LVGL thread using lv.async_call():
self.receive_qr_data = self.wallet.static_receive_code
if self.receive_qr_data:
self.receive_qr.update(self.receive_qr_data, len(self.receive_qr_data))
else:
print("Warning: redraw_static_receive_code_cb() was called while self.wallet.static_receive_code is None...")
def error_cb(self, error):
if self.wallet and self.wallet.is_running():
self.payments_label.set_text(str(error))
def should_show_setting(self, setting):
wallet_type = self.prefs.get_string("wallet_type")
if wallet_type != "lnbits" and setting["key"].startswith("lnbits_"):
return False
if wallet_type != "nwc" and setting["key"].startswith("nwc_"):
return False
return True
def settings_button_tap(self, event):
intent = Intent(activity_class=SettingsActivity)
intent.putExtra("prefs", self.prefs)
intent.putExtra("settings", [
{"title": "Wallet Type", "key": "wallet_type", "ui": "radiobuttons", "ui_options": [("LNBits", "lnbits"), ("Nostr Wallet Connect", "nwc")]},
{"title": "LNBits URL", "key": "lnbits_url", "placeholder": "https://demo.lnpiggy.com", "should_show": self.should_show_setting},
{"title": "LNBits Read Key", "key": "lnbits_readkey", "placeholder": "fd92e3f8168ba314dc22e54182784045", "should_show": self.should_show_setting},
{"title": "Optional LN Address", "key": "lnbits_static_receive_code", "placeholder": "Will be fetched if empty.", "should_show": self.should_show_setting},
{"title": "Nost Wallet Connect", "key": "nwc_url", "placeholder": "nostr+walletconnect://69effe7b...", "should_show": self.should_show_setting},
{"title": "Optional LN Address", "key": "nwc_static_receive_code", "placeholder": "Optional if present in NWC URL.", "should_show": self.should_show_setting},
])
self.startActivity(intent)
def main_ui_set_defaults(self):
self.balance_label.set_text("Welcome!")
self.payments_label.set_text(lv.SYMBOL.REFRESH)
def balance_label_clicked_cb(self, event):
print("Balance clicked")
self.balance_mode = (self.balance_mode + 1) % 5
self.display_balance(self.wallet.last_known_balance)
def qr_clicked_cb(self, event):
print("QR clicked")
if not self.receive_qr_data:
return
self.destination = FullscreenQR
self.startActivity(Intent(activity_class=self.fullscreenqr).putExtra("receive_qr_data", self.receive_qr_data))
@@ -0,0 +1,281 @@
import ssl
import json
import time
from mpos.util import urldecode
from mpos import TaskManager
from nostr.relay_manager import RelayManager
from nostr.message_type import ClientMessageType
from nostr.filter import Filter, Filters
from nostr.event import EncryptedDirectMessage
from nostr.key import PrivateKey
from payment import Payment
from unique_sorted_list import UniqueSortedList
class NWCWallet():
PAYMENTS_TO_SHOW = 6
PERIODIC_FETCH_BALANCE_SECONDS = 60 # seconds
relays = []
secret = None
wallet_pubkey = None
def __init__(self, nwc_url):
super().__init__()
self.nwc_url = nwc_url
if not nwc_url:
raise ValueError('NWC URL is not set.')
self.connected = False
self.relays, self.wallet_pubkey, self.secret, self.lud16 = self.parse_nwc_url(self.nwc_url)
if not self.relays:
raise ValueError('Missing relay in NWC URL.')
if not self.wallet_pubkey:
raise ValueError('Missing public key in NWC URL.')
if not self.secret:
raise ValueError('Missing "secret" in NWC URL.')
#if not self.lud16:
# raise ValueError('Missing lud16 (= lightning address) in NWC URL.')
def getCommentFromTransaction(self, transaction):
comment = ""
try:
comment = transaction["description"]
if comment is None:
return comment
json_comment = json.loads(comment)
for field in json_comment:
if field[0] == "text/plain":
comment = field[1]
break
else:
print("text/plain field is missing from JSON description")
except Exception as e:
print(f"Info: comment {comment} is not JSON, this is fine, using as-is ({e})")
comment = super().try_parse_as_zap(comment)
return comment
async def async_wallet_manager_task(self):
if self.lud16:
self.handle_new_static_receive_code(self.lud16)
self.private_key = PrivateKey(bytes.fromhex(self.secret))
self.relay_manager = RelayManager()
for relay in self.relays:
self.relay_manager.add_relay(relay)
print(f"DEBUG: Opening relay connections")
await self.relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE})
self.connected = False
nrconnected = 0
for _ in range(100):
await TaskManager.sleep(0.1)
nrconnected = self.relay_manager.connected_or_errored_relays()
#print(f"Waiting for relay connections, currently: {nrconnected}/{len(self.relays)}")
if nrconnected == len(self.relays) or not self.keep_running:
break
if nrconnected == 0:
self.handle_error("Could not connect to any Nostr Wallet Connect relays.")
return
if not self.keep_running:
print(f"async_wallet_manager_task does not have self.keep_running, returning...")
return
print(f"{nrconnected} relays connected")
# Set up subscription to receive response
self.subscription_id = "micropython_nwc_" + str(round(time.time()))
print(f"DEBUG: Setting up subscription with ID: {self.subscription_id}")
self.filters = Filters([Filter(
#event_ids=[self.subscription_id], would be nice to filter, but not like this
kinds=[23195, 23196], # NWC reponses and notifications
authors=[self.wallet_pubkey],
pubkey_refs=[self.private_key.public_key.hex()]
)])
print(f"DEBUG: Subscription filters: {self.filters.to_json_array()}")
self.relay_manager.add_subscription(self.subscription_id, self.filters)
print(f"DEBUG: Creating subscription request")
request_message = [ClientMessageType.REQUEST, self.subscription_id]
request_message.extend(self.filters.to_json_array())
print(f"DEBUG: Publishing subscription request")
self.relay_manager.publish_message(json.dumps(request_message))
print(f"DEBUG: Published subscription request")
last_fetch_balance = time.time() - self.PERIODIC_FETCH_BALANCE_SECONDS
while True: # handle incoming events and do periodic fetch_balance
#print(f"checking for incoming events...")
await TaskManager.sleep(0.1)
if not self.keep_running:
print("NWCWallet: not keep_running, closing connections...")
await self.relay_manager.close_connections()
break
if time.time() - last_fetch_balance >= self.PERIODIC_FETCH_BALANCE_SECONDS:
last_fetch_balance = time.time()
try:
await self.fetch_balance()
except Exception as e:
print(f"fetch_balance got exception {e}") # fetch_balance got exception 'NoneType' object isn't iterable?!
start_time = time.ticks_ms()
if self.relay_manager.message_pool.has_events():
print(f"DEBUG: Event received from message pool after {time.ticks_ms()-start_time}ms")
event_msg = self.relay_manager.message_pool.get_event()
event_created_at = event_msg.event.created_at
print(f"Received at {time.localtime()} a message with timestamp {event_created_at} after {time.ticks_ms()-start_time}ms")
try:
# This takes a very long time, even for short messages:
decrypted_content = self.private_key.decrypt_message(
event_msg.event.content,
event_msg.event.public_key,
)
print(f"DEBUG: Decrypted content: {decrypted_content} after {time.ticks_ms()-start_time}ms")
response = json.loads(decrypted_content)
print(f"DEBUG: Parsed response: {response}")
result = response.get("result")
if result:
if result.get("balance") is not None:
new_balance = round(int(result["balance"]) / 1000)
print(f"Got balance: {new_balance}")
self.handle_new_balance(new_balance)
elif result.get("transactions") is not None:
print("Response contains transactions!")
new_payment_list = UniqueSortedList()
for transaction in result["transactions"]:
amount = transaction["amount"]
amount = round(amount / 1000)
comment = self.getCommentFromTransaction(transaction)
epoch_time = transaction["created_at"]
paymentObj = Payment(epoch_time, amount, comment)
new_payment_list.add(paymentObj)
if len(new_payment_list) > 0:
# do them all in one shot instead of one-by-one because the lv_async() isn't always chronological,
# so when a long list of payments is added, it may be overwritten by a short list
self.handle_new_payments(new_payment_list)
else:
notification = response.get("notification")
if notification:
amount = notification["amount"]
amount = round(amount / 1000)
type = notification["type"]
if type == "outgoing":
amount = -amount
elif type == "incoming":
new_balance = self.last_known_balance + amount
self.handle_new_balance(new_balance, False) # don't trigger full fetch because payment info is in notification
epoch_time = notification["created_at"]
comment = self.getCommentFromTransaction(notification)
paymentObj = Payment(epoch_time, amount, comment)
self.handle_new_payment(paymentObj)
else:
print(f"WARNING: invalid notification type {type}, ignoring.")
else:
print("Unsupported response, ignoring.")
except Exception as e:
print(f"DEBUG: Error processing response: {e}")
import sys
sys.print_exception(e) # Full traceback on MicroPython
else:
#print(f"pool has no events after {time.ticks_ms()-start_time}ms") # completes in 0-1ms
pass
def fetch_balance(self):
try:
if not self.keep_running:
return
# Create get_balance request
balance_request = {
"method": "get_balance",
"params": {}
}
print(f"DEBUG: Created balance request: {balance_request}")
print(f"DEBUG: Creating encrypted DM to wallet pubkey: {self.wallet_pubkey}")
dm = EncryptedDirectMessage(
recipient_pubkey=self.wallet_pubkey,
cleartext_content=json.dumps(balance_request),
kind=23194
)
print(f"DEBUG: Signing DM {json.dumps(dm)} with private key")
self.private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm
print(f"DEBUG: Publishing encrypted DM")
self.relay_manager.publish_event(dm)
except Exception as e:
print(f"inside fetch_balance exception: {e}")
def fetch_payments(self):
if not self.keep_running:
return
# Create get_balance request
list_transactions = {
"method": "list_transactions",
"params": {
"limit": self.PAYMENTS_TO_SHOW
}
}
dm = EncryptedDirectMessage(
recipient_pubkey=self.wallet_pubkey,
cleartext_content=json.dumps(list_transactions),
kind=23194
)
self.private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm
print("\nPublishing DM to fetch payments...")
self.relay_manager.publish_event(dm)
def parse_nwc_url(self, nwc_url):
"""Parse Nostr Wallet Connect URL to extract pubkey, relays, secret, and lud16."""
print(f"DEBUG: Starting to parse NWC URL: {nwc_url}")
try:
# Remove 'nostr+walletconnect://' or 'nwc:' prefix
if nwc_url.startswith('nostr+walletconnect://'):
print(f"DEBUG: Removing 'nostr+walletconnect://' prefix")
nwc_url = nwc_url[22:]
elif nwc_url.startswith('nwc:'):
print(f"DEBUG: Removing 'nwc:' prefix")
nwc_url = nwc_url[4:]
else:
print(f"DEBUG: No recognized prefix found in URL")
raise ValueError("Invalid NWC URL: missing 'nostr+walletconnect://' or 'nwc:' prefix")
print(f"DEBUG: URL after prefix removal: {nwc_url}")
# urldecode because the relay might have %3A%2F%2F etc
nwc_url = urldecode(nwc_url)
print(f"after urldecode: {nwc_url}")
# Split into pubkey and query params
parts = nwc_url.split('?')
pubkey = parts[0]
print(f"DEBUG: Extracted pubkey: {pubkey}")
# Validate pubkey (should be 64 hex characters)
if len(pubkey) != 64 or not all(c in '0123456789abcdef' for c in pubkey):
raise ValueError("Invalid NWC URL: pubkey must be 64 hex characters")
# Extract relay, secret, and lud16 from query params
relays = []
lud16 = None
secret = None
if len(parts) > 1:
print(f"DEBUG: Query parameters found: {parts[1]}")
params = parts[1].split('&')
for param in params:
if param.startswith('relay='):
relay = param[6:]
print(f"DEBUG: Extracted relay: {relay}")
relays.append(relay)
elif param.startswith('secret='):
secret = param[7:]
print(f"DEBUG: Extracted secret: {secret}")
elif param.startswith('lud16='):
lud16 = param[6:]
print(f"DEBUG: Extracted lud16: {lud16}")
else:
print(f"DEBUG: No query parameters found")
if not pubkey or not len(relays) > 0 or not secret:
raise ValueError("Invalid NWC URL: missing required fields (pubkey, relay, or secret)")
# Validate secret (should be 64 hex characters)
if len(secret) != 64 or not all(c in '0123456789abcdef' for c in secret):
raise ValueError("Invalid NWC URL: secret must be 64 hex characters")
print(f"DEBUG: Parsed NWC data - Relay: {relays}, Pubkey: {pubkey}, Secret: {secret}, lud16: {lud16}")
return relays, pubkey, secret, lud16
except Exception as e:
raise RuntimeError(f"Exception parsing NWC URL {nwc_url}: {e}")
@@ -0,0 +1,43 @@
# Payment class remains unchanged
class Payment:
def __init__(self, epoch_time, amount_sats, comment):
self.epoch_time = epoch_time
self.amount_sats = amount_sats
self.comment = comment
def __str__(self):
sattext = "sats"
if self.amount_sats == 1:
sattext = "sat"
if not self.comment:
verb = "spent"
if self.amount_sats > 0:
verb = "received!"
return f"{self.amount_sats} {sattext} {verb}"
#return f"{self.amount_sats} {sattext} @ {self.epoch_time}: {self.comment}"
return f"{self.amount_sats} {sattext}: {self.comment}"
def __eq__(self, other):
if not isinstance(other, Payment):
return False
return self.epoch_time == other.epoch_time and self.amount_sats == other.amount_sats and self.comment == other.comment
def __lt__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) < (other.epoch_time, other.amount_sats, other.comment)
def __le__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) <= (other.epoch_time, other.amount_sats, other.comment)
def __gt__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) > (other.epoch_time, other.amount_sats, other.comment)
def __ge__(self, other):
if not isinstance(other, Payment):
return NotImplemented
return (self.epoch_time, self.amount_sats, self.comment) >= (other.epoch_time, other.amount_sats, other.comment)
@@ -0,0 +1,43 @@
# keeps a list of items
# The .add() method ensures the list remains unique (via __eq__)
# and sorted (via __lt__) by inserting new items in the correct position.
class UniqueSortedList:
def __init__(self):
self._items = []
def add(self, item):
#print(f"before add: {str(self)}")
# Check if item already exists (using __eq__)
if item not in self._items:
# Insert item in sorted position for descending order (using __gt__)
for i, existing_item in enumerate(self._items):
if item > existing_item:
self._items.insert(i, item)
return
# If item is smaller than all existing items, append it
self._items.append(item)
#print(f"after add: {str(self)}")
def __iter__(self):
# Return iterator for the internal list
return iter(self._items)
def get(self, index_nr):
# Retrieve item at given index, raise IndexError if invalid
try:
return self._items[index_nr]
except IndexError:
raise IndexError("Index out of range")
def __len__(self):
# Return the number of items for len() calls
return len(self._items)
def __str__(self):
#print("UniqueSortedList tostring called")
return "\n".join(str(item) for item in self._items)
def __eq__(self, other):
if len(self._items) != len(other):
return False
return all(p1 == p2 for p1, p2 in zip(self._items, other))
Binary file not shown.

After

Width:  |  Height:  |  Size: 5.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.7 KiB

+1
View File
@@ -20,6 +20,7 @@ rm "$outputjson"
# com.micropythonos.errortest is an intentional bad app for testing (caught by tests/test_graphical_launch_all_apps.py)
# com.micropythonos.showbattery is just a test
# com.micropythonos.doom_launcher isn't ready because the firmware doesn't have doom built-in yet
# com.micropythonos.nostr isn't ready for release yet
blacklist="com.micropythonos.filemanager com.quasikili.quasidoodle com.micropythonos.errortest com.micropythonos.showbattery com.micropythonos.doom_launcher"
echo "[" | tee -a "$outputjson"