diff --git a/internal_filesystem/apps/com.lightningpiggy.displaywallet/META-INF/MANIFEST.JSON b/internal_filesystem/apps/com.lightningpiggy.displaywallet/META-INF/MANIFEST.JSON deleted file mode 100644 index 4a4a25a3..00000000 --- a/internal_filesystem/apps/com.lightningpiggy.displaywallet/META-INF/MANIFEST.JSON +++ /dev/null @@ -1,23 +0,0 @@ -{ -"name": "Lightning Piggy", -"publisher": "LightningPiggy Foundation", -"short_description": "Display wallet that shows balance, transactions, receive QR code etc.", -"long_description": "See https://www.LightningPiggy.com", -"icon_url": "https://apps.micropythonos.com/apps/com.lightningpiggy.displaywallet/icons/com.lightningpiggy.displaywallet_0.0.1_64x64.png", -"download_url": "https://apps.micropythonos.com/apps/com.lightningpiggy.displaywallet/mpks/com.lightningpiggy.displaywallet_0.0.1.mpk", -"fullname": "com.lightningpiggy.displaywallet", -"version": "0.0.1", -"category": "finance", -"activities": [ - { - "entrypoint": "assets/displaywallet.py", - "classname": "DisplayWallet", - "intent_filters": [ - { - "action": "main", - "category": "launcher" - } - ] - } - ] -} diff --git a/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/captureqr.py b/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/captureqr.py deleted file mode 100644 index 7a75ab4f..00000000 --- a/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/captureqr.py +++ /dev/null @@ -1,277 +0,0 @@ -# This code grabs images from the camera in RGB565 format (2 bytes per pixel) -# and sends that to the QR decoder if QR decoding is enabled. -# The QR decoder then converts the RGB565 to grayscale, as that's what quirc operates on. -# It would be slightly more efficient to capture the images from the camera in L8/grayscale format, -# or in YUV format and discarding the U and V planes, but then the image will be gray (not great UX) -# and the performance impact of converting RGB565 to grayscale is probably minimal anyway. - -import lvgl as lv - -try: - import webcam -except Exception as e: - print(f"Info: could not import webcam module: {e}") - -from mpos.apps import Activity - -class Camera(Activity): - - width = 240 - height = 240 - - status_label_text = "No camera found." - status_label_text_searching = "Searching QR codes...\n\nHold still and make them big!\n10cm for simple QR codes,\n20cm for complex." - status_label_text_found = "Decoding QR..." - - cam = None - current_cam_buffer = None # Holds the current memoryview to prevent garbage collection - - image = None - image_dsc = None - scanqr_mode = None - use_webcam = False - keepliveqrdecoding = False - - capture_timer = None - - # Widgets: - qr_label = None - qr_button = None - snap_button = None - status_label = None - status_label_cont = None - - def onCreate(self): - self.scanqr_mode = self.getIntent().extras.get("scanqr_mode") - main_screen = lv.obj() - main_screen.set_style_pad_all(0, 0) - main_screen.set_style_border_width(0, 0) - main_screen.set_size(lv.pct(100), lv.pct(100)) - main_screen.set_scrollbar_mode(lv.SCROLLBAR_MODE.OFF) - close_button = lv.button(main_screen) - close_button.set_size(60,60) - close_button.align(lv.ALIGN.TOP_RIGHT, 0, 0) - close_label = lv.label(close_button) - close_label.set_text(lv.SYMBOL.CLOSE) - close_label.center() - close_button.add_event_cb(lambda e: self.finish(),lv.EVENT.CLICKED,None) - self.snap_button = lv.button(main_screen) - self.snap_button.set_size(60, 60) - self.snap_button.align(lv.ALIGN.RIGHT_MID, 0, 0) - self.snap_button.add_flag(lv.obj.FLAG.HIDDEN) - self.snap_button.add_event_cb(self.snap_button_click,lv.EVENT.CLICKED,None) - snap_label = lv.label(self.snap_button) - snap_label.set_text(lv.SYMBOL.OK) - snap_label.center() - self.qr_button = lv.button(main_screen) - self.qr_button.set_size(60, 60) - self.qr_button.add_flag(lv.obj.FLAG.HIDDEN) - self.qr_button.align(lv.ALIGN.BOTTOM_RIGHT, 0, 0) - self.qr_button.add_event_cb(self.qr_button_click,lv.EVENT.CLICKED,None) - self.qr_label = lv.label(self.qr_button) - self.qr_label.set_text(lv.SYMBOL.EYE_OPEN) - self.qr_label.center() - # Initialize LVGL image widget - self.image = lv.image(main_screen) - self.image.align(lv.ALIGN.LEFT_MID, 0, 0) - # Create image descriptor once - self.image_dsc = lv.image_dsc_t({ - "header": { - "magic": lv.IMAGE_HEADER_MAGIC, - "w": self.width, - "h": self.height, - "stride": self.width * 2, - "cf": lv.COLOR_FORMAT.RGB565 - #"cf": lv.COLOR_FORMAT.L8 - }, - 'data_size': self.width * self.height * 2, - 'data': None # Will be updated per frame - }) - self.image.set_src(self.image_dsc) - self.status_label_cont = lv.obj(main_screen) - self.status_label_cont.set_size(lv.pct(66),lv.pct(60)) - self.status_label_cont.align(lv.ALIGN.LEFT_MID, lv.pct(5), 0) - self.status_label_cont.set_style_bg_color(lv.color_white(), 0) - self.status_label_cont.set_style_bg_opa(66, 0) - self.status_label_cont.set_style_border_width(0, 0) - self.status_label = lv.label(self.status_label_cont) - self.status_label.set_text("No camera found.") - self.status_label.set_long_mode(lv.label.LONG.WRAP) - self.status_label.set_style_text_color(lv.color_white(), 0) - self.status_label.set_width(lv.pct(100)) - self.status_label.center() - self.setContentView(main_screen) - - def onResume(self, screen): - self.cam = init_internal_cam() - if self.cam: - self.image.set_rotation(900) # internal camera is rotated 90 degrees - else: - print("camtest.py: no internal camera found, trying webcam on /dev/video0") - try: - self.cam = webcam.init("/dev/video0") - self.use_webcam = True - except Exception as e: - print(f"camtest.py: webcam exception: {e}") - if self.cam: - print("Camera initialized, continuing...") - self.capture_timer = lv.timer_create(self.try_capture, 100, None) - self.status_label_cont.add_flag(lv.obj.FLAG.HIDDEN) - if self.scanqr_mode: - self.start_qr_decoding() - else: - self.qr_button.remove_flag(lv.obj.FLAG.HIDDEN) - self.snap_button.remove_flag(lv.obj.FLAG.HIDDEN) - else: - print("No camera found, stopping camtest.py") - if self.scanqr_mode: - self.finish() - - - def onStop(self, screen): - print("camtest.py backgrounded, cleaning up...") - if self.capture_timer: - self.capture_timer.delete() - if self.use_webcam: - webcam.deinit(self.cam) - elif self.cam: - self.cam.deinit() - print("camtest.py cleanup done.") - - def qrdecode_one(self): - try: - import qrdecode - result = qrdecode.qrdecode_rgb565(self.current_cam_buffer, self.width, self.height) - #result = bytearray("INSERT_QR_HERE", "utf-8") - if not result: - self.status_label.set_text(self.status_label_text_searching) - else: - self.stop_qr_decoding() - result = remove_bom(result) - result = print_qr_buffer(result) - print(f"QR decoding found: {result}") - if self.scanqr_mode: - self.setResult(True, result) - self.finish() - else: - self.status_label.set_text(result) # in the future, the status_label text should be copy-paste-able - except ValueError as e: - print("QR ValueError: ", e) - self.status_label.set_text(self.status_label_text_searching) - except TypeError as e: - print("QR TypeError: ", e) - self.status_label.set_text(self.status_label_text_found) - except Exception as e: - print("QR got other error: ", e) - - def snap_button_click(self, e): - print("Picture taken!") - import os - try: - os.mkdir("data") - except OSError: - pass - try: - os.mkdir("data/com.example.camtest") - except OSError: - pass - if self.current_cam_buffer is not None: - filename="data/com.example.camtest/capture.raw" - try: - with open(filename, 'wb') as f: - f.write(self.current_cam_buffer) - print(f"Successfully wrote current_cam_buffer to {filename}") - except OSError as e: - print(f"Error writing to file: {e}") - - def start_qr_decoding(self): - print("Activating live QR decoding...") - self.keepliveqrdecoding = True - self.qr_label.set_text(lv.SYMBOL.EYE_CLOSE) - self.status_label_cont.remove_flag(lv.obj.FLAG.HIDDEN) - self.status_label.set_text(self.status_label_text_searching) - - def stop_qr_decoding(self): - print("Deactivating live QR decoding...") - self.keepliveqrdecoding = False - self.qr_label.set_text(lv.SYMBOL.EYE_OPEN) - self.status_label_text = self.status_label.get_text() - if self.status_label_text in (self.status_label_text_searching or self.status_label_text_found): # if it found a QR code, leave it - self.status_label_cont.add_flag(lv.obj.FLAG.HIDDEN) - - def qr_button_click(self, e): - if not self.keepliveqrdecoding: - self.start_qr_decoding() - else: - self.stop_qr_decoding() - - def try_capture(self, event): - #print("capturing camera frame") - try: - if self.use_webcam: - self.current_cam_buffer = webcam.capture_frame(self.cam, "rgb565") - elif self.cam.frame_available(): - self.current_cam_buffer = self.cam.capture() - if self.current_cam_buffer and len(self.current_cam_buffer): - self.image_dsc.data = self.current_cam_buffer - #image.invalidate() # does not work so do this: - self.image.set_src(self.image_dsc) - if not self.use_webcam: - self.cam.free_buffer() # Free the old buffer - if self.keepliveqrdecoding: - self.qrdecode_one() - except Exception as e: - print(f"Camera capture exception: {e}") - - - -# Non-class functions: -def init_internal_cam(): - try: - from camera import Camera, GrabMode, PixelFormat, FrameSize, GainCeiling - cam = Camera( - data_pins=[12,13,15,11,14,10,7,2], - vsync_pin=6, - href_pin=4, - sda_pin=21, - scl_pin=16, - pclk_pin=9, - xclk_pin=8, - xclk_freq=20000000, - powerdown_pin=-1, - reset_pin=-1, - pixel_format=PixelFormat.RGB565, - #pixel_format=PixelFormat.GRAYSCALE, - frame_size=FrameSize.R240X240, - grab_mode=GrabMode.LATEST - ) - #cam.init() automatically done when creating the Camera() - #cam.reconfigure(frame_size=FrameSize.HVGA) - #frame_size=FrameSize.HVGA, # 480x320 - #frame_size=FrameSize.QVGA, # 320x240 - #frame_size=FrameSize.QQVGA # 160x120 - cam.set_vflip(True) - return cam - except Exception as e: - print(f"init_cam exception: {e}") - return None - -def print_qr_buffer(buffer): - try: - # Try to decode buffer as a UTF-8 string - result = buffer.decode('utf-8') - # Check if the string is printable (ASCII printable characters) - if all(32 <= ord(c) <= 126 for c in result): - return result - except Exception as e: - pass - # If not a valid string or not printable, convert to hex - hex_str = ' '.join([f'{b:02x}' for b in buffer]) - return hex_str.lower() - -# Byte-Order-Mark is added sometimes -def remove_bom(buffer): - bom = b'\xEF\xBB\xBF' - if buffer.startswith(bom): - return buffer[3:] - return buffer diff --git a/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/displaywallet.py b/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/displaywallet.py deleted file mode 100644 index e8ef88fc..00000000 --- a/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/displaywallet.py +++ /dev/null @@ -1,363 +0,0 @@ -from mpos.apps import Activity, Intent -import mpos.config - -from wallet import LNBitsWallet, NWCWallet -from captureqr import Camera - -class DisplayWallet(Activity): - - wallet = None - receive_qr_data = None - destination = None - - # widgets - balance_label = None - receive_qr = None - payments_label = None - - def onCreate(self): - main_screen = lv.obj() - main_screen.set_style_pad_all(10, 0) - self.balance_label = lv.label(main_screen) - self.balance_label.set_text("") - self.balance_label.align(lv.ALIGN.TOP_LEFT, 0, 0) - self.balance_label.set_style_text_font(lv.font_montserrat_22, 0) - self.receive_qr = lv.qrcode(main_screen) - self.receive_qr.set_size(50) - self.receive_qr.set_dark_color(lv.color_black()) - self.receive_qr.set_light_color(lv.color_white()) - self.receive_qr.align(lv.ALIGN.TOP_RIGHT,0,0) - self.receive_qr.set_style_border_color(lv.color_white(), 0) - self.receive_qr.set_style_border_width(3, 0); - self.receive_qr.add_flag(lv.obj.FLAG.CLICKABLE) - self.receive_qr.add_event_cb(self.qr_clicked_cb,lv.EVENT.CLICKED,None) - balance_line = lv.line(main_screen) - balance_line.set_points([{'x':0,'y':35},{'x':200,'y':35}],2) - self.payments_label = lv.label(main_screen) - self.payments_label.set_text("") - self.payments_label.align_to(balance_line,lv.ALIGN.OUT_BOTTOM_LEFT,0,10) - self.payments_label.set_style_text_font(lv.font_montserrat_16, 0) - settings_button = lv.button(main_screen) - settings_button.align(lv.ALIGN.BOTTOM_RIGHT, 0, 0) - snap_label = lv.label(settings_button) - snap_label.set_text(lv.SYMBOL.SETTINGS) - snap_label.center() - settings_button.add_event_cb(self.settings_button_tap,lv.EVENT.CLICKED,None) - self.setContentView(main_screen) - - def onStart(self, main_screen): - self.main_ui_set_defaults() - - def onResume(self, main_screen): - if not self.wallet or not self.wallet.is_running(): # just started the app or just returned from settings_screen - config = mpos.config.SharedPreferences("com.lightningpiggy.displaywallet") - wallet_type = config.get_string("wallet_type") - if wallet_type == "lnbits": - try: - self.receive_qr_data = config.get_string("lnbits_static_receive_code") - self.wallet = LNBitsWallet(config.get_string("lnbits_url"), config.get_string("lnbits_readkey")) - except Exception as e: - self.payments_label.set_text(f"Couldn't initialize LNBitsWallet\nbecause: {e}") - elif wallet_type == "nwc": - try: - self.wallet = NWCWallet(config.get_string("nwc_url")) - self.receive_qr_data = wallet.lud16 - except Exception as e: - self.payments_label.set_text(f"Couldn't initialize NWCWallet\nbecause: {e}") - else: - self.payments_label.set_text(f"No or unsupported wallet\ntype configured: '{wallet_type}'") - if self.receive_qr_data: - print(f"Setting static_receive_code: {self.receive_qr_data}") - self.receive_qr.update(self.receive_qr_data, len(self.receive_qr_data)) - can_check_network = True - try: - import network - except Exception as e: - can_check_network = False - if can_check_network and not network.WLAN(network.STA_IF).isconnected(): - self.payments_label.set_text(f"WiFi is not connected, can't\ntalk to {wallet_type} backend.") - else: - if self.wallet: - self.payments_label.set_text(f"Connecting to {wallet_type} backend...") - self.wallet.start(self.redraw_balance_cb, self.redraw_payments_cb) - else: - self.payments_label.set_text(f"Could not start {wallet_type} backend.") - - def onStop(self, main_screen): - if self.wallet and self.destination != FullscreenQR: - self.wallet.stop() - self.destination = None - - def redraw_balance_cb(self): - # this gets called from another thread (the wallet) so make sure it happens in the LVGL thread using lv.async_call(): - lv.async_call(lambda l: self.balance_label.set_text(str(self.wallet.last_known_balance)), None) - - def redraw_payments_cb(self): - # this gets called from another thread (the wallet) so make sure it happens in the LVGL thread using lv.async_call(): - lv.async_call(lambda l: self.payments_label.set_text(str(self.wallet.payment_list)), None) - - def settings_button_tap(self, event): - self.startActivity(Intent(activity_class=SettingsActivity)) - - def main_ui_set_defaults(self): - self.balance_label.set_text(lv.SYMBOL.REFRESH) - self.payments_label.set_text(lv.SYMBOL.REFRESH) - self.receive_qr.update("EMPTY PLACEHOLDER", len("EMPTY PLACEHOLDER")) - - def qr_clicked_cb(self, event): - print("QR clicked") - if not self.receive_qr_data: - return - self.destination = FullscreenQR - self.startActivity(Intent(activity_class=FullscreenQR).putExtra("receive_qr_data", self.receive_qr_data)) - -# Used to list and edit all settings: -class SettingsActivity(Activity): - def __init__(self): - super().__init__() - self.prefs = mpos.config.SharedPreferences("com.lightningpiggy.displaywallet") - self.settings = [ - {"title": "Wallet Type", "key": "wallet_type", "value_label": None, "cont": None}, - {"title": "LNBits URL", "key": "lnbits_url", "value_label": None, "cont": None}, - {"title": "LNBits Read Key", "key": "lnbits_readkey", "value_label": None, "cont": None}, - {"title": "Static receive code", "key": "lnbits_static_receive_code", "value_label": None, "cont": None}, - {"title": "NWC URL", "key": "nwc_url", "value_label": None, "cont": None}, - ] - self.keyboard = None - self.textarea = None - self.radio_container = None - self.active_radio_index = 0 # Track active radio button index - - def onCreate(self): - screen = lv.obj() - print("creating SettingsActivity ui...") - screen.set_size(lv.pct(100), lv.pct(100)) - screen.set_style_pad_all(10, 0) - screen.set_flex_flow(lv.FLEX_FLOW.COLUMN) - screen.set_style_border_width(0, 0) - - # Create settings entries - for setting in self.settings: - # Container for each setting - setting_cont = lv.obj(screen) - setting_cont.set_width(lv.pct(100)) - setting_cont.set_height(lv.SIZE_CONTENT) - setting_cont.set_style_border_width(1, 0) - setting_cont.set_style_border_side(lv.BORDER_SIDE.BOTTOM, 0) - setting_cont.set_style_pad_all(8, 0) - setting_cont.add_flag(lv.obj.FLAG.CLICKABLE) - setting["cont"] = setting_cont # Store container reference for visibility control - - # Title label (bold, larger) - title = lv.label(setting_cont) - title.set_text(setting["title"]) - title.set_style_text_font(lv.font_montserrat_16, 0) - title.set_pos(0, 0) - - # Value label (smaller, below title) - value = lv.label(setting_cont) - value.set_text(self.prefs.get_string(setting["key"], "Not set")) - value.set_style_text_font(lv.font_montserrat_12, 0) - value.set_style_text_color(lv.color_hex(0x666666), 0) - value.set_pos(0, 20) - setting["value_label"] = value # Store reference for updating - setting_cont.add_event_cb( - lambda e, s=setting: self.startSettingActivity(s), lv.EVENT.CLICKED, None - ) - self.setContentView(screen) - - def onResume(self, screen): - wallet_type = self.prefs.get_string("wallet_type", "lnbits") - # update setting visibility based on wallet_type: - for setting in self.settings: - if setting["key"].startswith("lnbits_"): - if wallet_type != "lnbits": - setting["cont"].add_flag(lv.obj.FLAG.HIDDEN) - else: - setting["cont"].remove_flag(lv.obj.FLAG.HIDDEN) - elif setting["key"].startswith("nwc_"): - if wallet_type != "nwc": - setting["cont"].add_flag(lv.obj.FLAG.HIDDEN) - else: - setting["cont"].remove_flag(lv.obj.FLAG.HIDDEN) - - def startSettingActivity(self, setting): - intent = Intent(activity_class=SettingActivity) - intent.putExtra("setting", setting) - self.startActivity(intent) - -# Used to edit one setting: -class SettingActivity(Activity): - def __init__(self): - super().__init__() - self.prefs = mpos.config.SharedPreferences("com.lightningpiggy.displaywallet") - self.setting = None - - def onCreate(self): - setting = self.getIntent().extras.get("setting") - settings_screen_detail = lv.obj() - settings_screen_detail.set_style_pad_all(10, 0) - settings_screen_detail.set_flex_flow(lv.FLEX_FLOW.COLUMN) - - top_cont = lv.obj(settings_screen_detail) - top_cont.set_width(lv.pct(100)) - top_cont.set_height(lv.SIZE_CONTENT) - top_cont.set_style_pad_all(0, 0) - top_cont.set_flex_flow(lv.FLEX_FLOW.ROW) - top_cont.set_style_flex_main_place(lv.FLEX_ALIGN.SPACE_BETWEEN, 0) - - setting_label = lv.label(top_cont) - setting_label.set_text(setting["title"]) - setting_label.align(lv.ALIGN.TOP_LEFT,0,0) - setting_label.set_style_text_font(lv.font_montserrat_22, 0) - - # Camera for text - cambutton = lv.button(top_cont) - cambutton.align(lv.ALIGN.TOP_RIGHT,0,0) - cambuttonlabel = lv.label(cambutton) - cambuttonlabel.set_text("SCAN QR") - cambuttonlabel.center() - cambutton.add_event_cb(self.cambutton_cb, lv.EVENT.CLICKED, None) - - if setting["key"] == "wallet_type": - cambutton.add_flag(lv.obj.FLAG.HIDDEN) - # Create container for radio buttons - self.radio_container = lv.obj(settings_screen_detail) - self.radio_container.set_width(lv.pct(100)) - self.radio_container.set_height(lv.SIZE_CONTENT) - self.radio_container.set_flex_flow(lv.FLEX_FLOW.COLUMN) - self.radio_container.add_event_cb(self.radio_event_handler, lv.EVENT.CLICKED, None) - - # Create radio buttons - options = [("LNBits", "lnbits"), ("Nostr Wallet Connect", "nwc")] - current_wallet = self.prefs.get_string("wallet_type", "lnbits") - self.active_radio_index = 0 if current_wallet == "lnbits" else 1 - - for i, (text, _) in enumerate(options): - cb = self.create_radio_button(self.radio_container, text, i) - if i == self.active_radio_index: - cb.add_state(lv.STATE.CHECKED) - else: - # Textarea for other settings - self.textarea = lv.textarea(settings_screen_detail) - self.textarea.set_width(lv.pct(100)) - self.textarea.set_height(lv.SIZE_CONTENT) - self.textarea.set_text(self.prefs.get_string(setting["key"], "")) - self.textarea.add_event_cb(self.show_keyboard, lv.EVENT.CLICKED, None) - self.textarea.add_event_cb(self.show_keyboard, lv.EVENT.FOCUSED, None) - self.textarea.add_event_cb(self.hide_keyboard, lv.EVENT.DEFOCUSED, None) - # Initialize keyboard (hidden initially) - self.keyboard = lv.keyboard(lv.layer_sys()) - self.keyboard.set_size(lv.pct(100), lv.pct(40)) - self.keyboard.align(lv.ALIGN.BOTTOM_MID, 0, 0) - self.keyboard.add_flag(lv.obj.FLAG.HIDDEN) - self.keyboard.add_event_cb(self.keyboard_cb, lv.EVENT.READY, None) - self.keyboard.add_event_cb(self.keyboard_cb, lv.EVENT.CANCEL, None) - self.keyboard.set_textarea(self.textarea) - - # Button container - btn_cont = lv.obj(settings_screen_detail) - btn_cont.set_width(lv.pct(100)) - btn_cont.set_height(lv.SIZE_CONTENT) - btn_cont.set_style_pad_all(5, 0) - btn_cont.set_flex_flow(lv.FLEX_FLOW.ROW) - btn_cont.set_style_flex_main_place(lv.FLEX_ALIGN.SPACE_BETWEEN, 0) - # Save button - save_btn = lv.button(btn_cont) - save_btn.set_size(lv.pct(45), lv.SIZE_CONTENT) - save_label = lv.label(save_btn) - save_label.set_text("Save") - save_label.center() - save_btn.add_event_cb(lambda e, s=setting: self.save_setting(s), lv.EVENT.CLICKED, None) - # Cancel button - cancel_btn = lv.button(btn_cont) - cancel_btn.set_size(lv.pct(45), lv.SIZE_CONTENT) - cancel_label = lv.label(cancel_btn) - cancel_label.set_text("Cancel") - cancel_label.center() - cancel_btn.add_event_cb(lambda e: self.finish(), lv.EVENT.CLICKED, None) - self.setContentView(settings_screen_detail) - - def hide_keyboard(self, event=None): - print("hide_keyboard: hiding keyboard") - self.keyboard.add_flag(lv.obj.FLAG.HIDDEN) - - def show_keyboard(self, event): - print("showing keyboard") - self.keyboard.remove_flag(lv.obj.FLAG.HIDDEN) - - def keyboard_cb(self, event=None): - print("keyboard_cb: Keyboard event triggered") - code = event.get_code() - if code == lv.EVENT.READY or code == lv.EVENT.CANCEL: - print("keyboard_cb: READY or CANCEL or RETURN clicked, hiding keyboard") - self.hide_keyboard() - - def radio_event_handler(self, event): - old_cb = self.radio_container.get_child(self.active_radio_index) - old_cb.remove_state(lv.STATE.CHECKED) - self.active_radio_index = -1 - for childnr in range(self.radio_container.get_child_count()): - child = self.radio_container.get_child(childnr) - state = child.get_state() - print(f"radio_container child's state: {state}") - if state != lv.STATE.DEFAULT: # State can be something like 19 = lv.STATE.HOVERED & lv.STATE.CHECKED & lv.STATE.FOCUSED - self.active_radio_index = childnr - break - print(f"active_radio_index is now {self.active_radio_index}") - - def create_radio_button(self, parent, text, index): - cb = lv.checkbox(parent) - cb.set_text(text) - cb.add_flag(lv.obj.FLAG.EVENT_BUBBLE) - # Add circular style to indicator for radio button appearance - style_radio = lv.style_t() - style_radio.init() - style_radio.set_radius(lv.RADIUS_CIRCLE) - cb.add_style(style_radio, lv.PART.INDICATOR) - style_radio_chk = lv.style_t() - style_radio_chk.init() - style_radio_chk.set_bg_image_src(None) - cb.add_style(style_radio_chk, lv.PART.INDICATOR | lv.STATE.CHECKED) - return cb - - def gotqr_result_callback(self, result): - print(f"QR capture finished, result: {result}") - if result.get("result_code"): - data = result.get("data") - print(f"Setting textarea data: {data}") - self.textarea.set_text(data) - - def cambutton_cb(self, event): - print("cambutton clicked!") - self.startActivityForResult(Intent(activity_class=Camera).putExtra("scanqr_mode", True), self.gotqr_result_callback) - - def save_setting(self, setting): - if setting["key"] == "wallet_type" and self.radio_container: - selected_idx = self.active_radio_index - new_value = "lnbits" if selected_idx == 0 else "nwc" - elif self.textarea: - new_value = self.textarea.get_text() - else: - new_value = "" - editor = self.prefs.edit() - editor.put_string(setting["key"], new_value) - editor.commit() - setting["value_label"].set_text(new_value if new_value else "Not set") - self.finish() - -class FullscreenQR(Activity): - # No __init__() so super.__init__() will be called automatically - - def onCreate(self): - receive_qr_data = self.getIntent().extras.get("receive_qr_data") - qr_screen = lv.obj() - big_receive_qr = lv.qrcode(qr_screen) - big_receive_qr.set_size(240) # TODO: make this dynamic - big_receive_qr.set_dark_color(lv.color_black()) - big_receive_qr.set_light_color(lv.color_white()) - big_receive_qr.center() - big_receive_qr.set_style_border_color(lv.color_white(), 0) - big_receive_qr.set_style_border_width(3, 0); - big_receive_qr.update(receive_qr_data, len(receive_qr_data)) - self.setContentView(qr_screen) diff --git a/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/wallet.py b/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/wallet.py deleted file mode 100644 index a6b68ddb..00000000 --- a/internal_filesystem/apps/com.lightningpiggy.displaywallet/assets/wallet.py +++ /dev/null @@ -1,501 +0,0 @@ -import _thread -import requests -import json -import ssl -import time - -from nostr.relay_manager import RelayManager -from nostr.message_type import ClientMessageType -from nostr.filter import Filter, Filters -from nostr.event import EncryptedDirectMessage -from nostr.key import PrivateKey - -from websocket import WebSocketApp - -import mpos.apps -import mpos.time -import mpos.util - -# keeps a list of items -# The .add() method ensures the list remains unique (via __eq__) -# and sorted (via __lt__) by inserting new items in the correct position. -class UniqueSortedList: - def __init__(self): - self._items = [] - - def add(self, item): - #print(f"before add: {str(self)}") - # Check if item already exists (using __eq__) - if item not in self._items: - # Insert item in sorted position for descending order (using __gt__) - for i, existing_item in enumerate(self._items): - if item > existing_item: - self._items.insert(i, item) - return - # If item is smaller than all existing items, append it - self._items.append(item) - #print(f"after add: {str(self)}") - - def __iter__(self): - # Return iterator for the internal list - return iter(self._items) - - def get(self, index_nr): - # Retrieve item at given index, raise IndexError if invalid - try: - return self._items[index_nr] - except IndexError: - raise IndexError("Index out of range") - - def __len__(self): - # Return the number of items for len() calls - return len(self._items) - - def __str__(self): - print("UniqueSortedList tostring called") - return "\n".join(str(item) for item in self._items) - - def __eq__(self, other): - if len(self._items) != len(other): - return False - return all(p1 == p2 for p1, p2 in zip(self._items, other)) - -# Payment class remains unchanged -class Payment: - def __init__(self, epoch_time, amount_sats, comment): - self.epoch_time = epoch_time - self.amount_sats = amount_sats - self.comment = comment - - def __str__(self): - sattext = "sats" - if self.amount_sats == 1: - sattext = "sat" - #return f"{self.amount_sats} {sattext} @ {self.epoch_time}: {self.comment}" - return f"{self.amount_sats} {sattext}: {self.comment}" - - def __eq__(self, other): - if not isinstance(other, Payment): - return False - return self.epoch_time == other.epoch_time and self.amount_sats == other.amount_sats and self.comment == other.comment - - def __lt__(self, other): - if not isinstance(other, Payment): - return NotImplemented - return (self.epoch_time, self.amount_sats, self.comment) < (other.epoch_time, other.amount_sats, other.comment) - - def __le__(self, other): - if not isinstance(other, Payment): - return NotImplemented - return (self.epoch_time, self.amount_sats, self.comment) <= (other.epoch_time, other.amount_sats, other.comment) - - def __gt__(self, other): - if not isinstance(other, Payment): - return NotImplemented - return (self.epoch_time, self.amount_sats, self.comment) > (other.epoch_time, other.amount_sats, other.comment) - - def __ge__(self, other): - if not isinstance(other, Payment): - return NotImplemented - return (self.epoch_time, self.amount_sats, self.comment) >= (other.epoch_time, other.amount_sats, other.comment) - -class Wallet: - - # These values could be loading from a cache.json file at __init__ - last_known_balance = -1 - #last_known_balance_timestamp = 0 - payment_list = None - - def __init__(self): - self.keep_running = True - self.payment_list = UniqueSortedList() - - def __str__(self): - if isinstance(self, LNBitsWallet): - return "LNBitsWallet" - elif isinstance(self, NWCWallet): - return "NWCWallet" - - def handle_new_balance(self, new_balance, fetchPaymentsIfChanged=True): - if not self.keep_running: - return - if new_balance != self.last_known_balance: - print("Balance changed!") - self.last_known_balance = new_balance - print("Calling balance_updated_cb") - self.balance_updated_cb() - if fetchPaymentsIfChanged: # Fetching *all* payments isn't necessary if balance was changed by a payment notification - print("Refreshing payments...") - self.fetch_payments() # if the balance changed, then re-list transactions - - def handle_new_payment(self, new_payment): - if not self.keep_running: - return - print("handle_new_payment") - self.payment_list.add(new_payment) - self.payments_updated_cb() - - def handle_new_payments(self, new_payments): - if not self.keep_running: - return - print("handle_new_payments") - if self.payment_list != new_payments: - print("new list of payments") - self.payment_list = new_payments - self.payments_updated_cb() - - # Need callbacks for: - # - started (so the user can show the UI) - # - stopped (so the user can delete/free it) - # - error (so the user can show the error) - # - balance - # - transactions - def start(self, balance_updated_cb, payments_updated_cb): - self.keep_running = True - self.balance_updated_cb = balance_updated_cb - self.payments_updated_cb = payments_updated_cb - _thread.stack_size(mpos.apps.good_stack_size()) - _thread.start_new_thread(self.wallet_manager_thread, ()) - - def stop(self): - self.keep_running = False - - def is_running(self): - return self.keep_running - -class LNBitsWallet(Wallet): - - def __init__(self, lnbits_url, lnbits_readkey): - super().__init__() - self.lnbits_url = lnbits_url - self.lnbits_readkey = lnbits_readkey - self.ws = None - - - def parseLNBitsPayment(self, transaction): - amount = transaction["amount"] - amount = round(amount / 1000) - comment = transaction["memo"] - epoch_time = transaction["time"] - try: - extra = transaction.get("extra") - if extra: - comment = extra.get("comment") - first_from_list = comment.get(0) # some LNBits 0.x versions return a list instead of a string here... - comment = first_from_list - except Exception as e: - pass - return Payment(epoch_time, amount, comment) - - # Example data: {"wallet_balance": 4936, "payment": {"checking_id": "037c14...56b3", "pending": false, "amount": 1000000, "fee": 0, "memo": "zap2oink", "time": 1711226003, "bolt11": "lnbc10u1pjl70y....qq9renr", "preimage": "0000...000", "payment_hash": "037c1438b20ef4729b1d3dc252c2809dc2a2a2e641c7fb99fe4324e182f356b3", "expiry": 1711226603.0, "extra": {"tag": "lnurlp", "link": "TkjgaB", "extra": "1000000", "comment": ["yes"], "lnaddress": "oink@demo.lnpiggy.com"}, "wallet_id": "c9168...8de4", "webhook": null, "webhook_status": null}} - def on_message(self, class_obj, message: str): - print(f"relay.py _on_message received: {message}") - try: - payment_notification = json.loads(message) - new_balance = payment_notification.get("wallet_balance") - if new_balance: - self.handle_new_balance(new_balance, False) # handle new balance BUT don't trigger a full fetch_payments - transaction = payment_notification.get("payment") - print(f"Got transaction: {transaction}") - paymentObj = self.parseLNBitsPayment(transaction) - self.handle_new_payment(paymentObj) - except Exception as e: - print(f"websocket on_message got exception: {e}") - - def websocket_thread(self): - if not self.keep_running: - return - print("Opening websocket for payment notifications...") - wsurl = self.lnbits_url + "/api/v1/ws/" + self.lnbits_readkey - wsurl = wsurl.replace("https://", "wss://") - wsurl = wsurl.replace("http://", "ws://") - self.ws = WebSocketApp( - wsurl, - on_message=self.on_message, - ) # maybe add other callbacks to reconnect when disconnected etc. - self.ws.run_forever() - - - def wallet_manager_thread(self): - print("wallet_manager_thread") - websocket_running = False - while self.keep_running: - try: - new_balance = self.fetch_balance() # TODO: only do this every 60 seconds, but loop the main thread more frequently - except Exception as e: - print(f"WARNING: wallet_manager_thread got exception {e}, ignorning.") - if not websocket_running and self.keep_running: # after - websocket_running = True - _thread.stack_size(mpos.apps.good_stack_size()) - _thread.start_new_thread(self.websocket_thread, ()) - print("Sleeping a while before re-fetching balance...") - for _ in range(120): - time.sleep(0.5) - if not self.keep_running: - break - print("wallet_manager_thread stopping") - if self.ws: - self.ws.close() - - def fetch_balance(self): - walleturl = self.lnbits_url + "/api/v1/wallet" - headers = { - "X-Api-Key": self.lnbits_readkey, - } - try: - print(f"Fetching balance with GET to {walleturl}") - response = requests.get(walleturl, timeout=10, headers=headers) - except Exception as e: - print("fetch_balance: get request failed:", e) - if response and response.status_code == 200 and self.keep_running: - response_text = response.text - print(f"Got response text: {response_text}") - response.close() - try: - balance_reply = json.loads(response_text) - print(f"Got balance: {balance_reply['balance']}") - balance_msat = balance_reply['balance'] - new_balance = round(int(balance_msat) / 1000) - self.handle_new_balance(new_balance) - except Exception as e: - print(f"Could not parse reponse text '{response_text}' as JSON: {e}") - raise e - - def fetch_payments(self): - paymentsurl = self.lnbits_url + "/api/v1/payments?limit=6" - headers = { - "X-Api-Key": self.lnbits_readkey, - } - try: - print(f"Fetching payments with GET to {paymentsurl}") - response = requests.get(paymentsurl, timeout=10, headers=headers) - except Exception as e: - print("fetch_payments: get request failed:", e) - if response and response.status_code == 200 and self.keep_running: - response_text = response.text - print(f"Got response text: {response_text}") - response.close() - try: - payments_reply = json.loads(response_text) - print(f"Got payments: {payments_reply}") - for transaction in payments_reply: - print(f"Got transaction: {transaction}") - paymentObj = self.parseLNBitsPayment(transaction) - self.handle_new_payment(paymentObj) - except Exception as e: - print(f"Could not parse reponse text '{response_text}' as JSON: {e}") - raise e - -class NWCWallet(Wallet): - - def __init__(self, nwc_url): - super().__init__() - self.nwc_url = nwc_url - self.connected = False - self.relay, self.wallet_pubkey, self.secret, self.lud16 = self.parse_nwc_url(self.nwc_url) - - def getCommentFromTransaction(self, transaction): - comment = "" - try: - comment = transaction["description"] - json_comment = json.loads(comment) - for field in json_comment: - if field[0] == "text/plain": - comment = field[1] - break - else: - print("text/plain field is missing from JSON description") - except Exception as e: - print(f"Info: could not parse comment as JSON, using as-is: {e}") - return comment - - def wallet_manager_thread(self): - self.private_key = PrivateKey(bytes.fromhex(self.secret)) - self.relay_manager = RelayManager() - self.relay_manager.add_relay(self.relay) - - print(f"DEBUG: Opening relay connections") - self.relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE}) - self.connected = False - for _ in range(20): - time.sleep(0.5) - if self.relay_manager.relays[self.relay].connected is True: - self.connected = True - break - elif not self.keep_running: - break - print("Waiting for relay connection...") - if not self.connected or not self.keep_running: - print(f"ERROR: could not connect to NWC relay {self.relay} or not self.keep_running, aborting...") - # TODO: call an error callback to notify the user - return - - # Set up subscription to receive response - self.subscription_id = "micropython_nwc_" + str(round(time.time())) - print(f"DEBUG: Setting up subscription with ID: {self.subscription_id}") - self.filters = Filters([Filter( - kinds=[23195, 23196], # NWC reponses and notifications - authors=[self.wallet_pubkey], - pubkey_refs=[self.private_key.public_key.hex()] - )]) - print(f"DEBUG: Subscription filters: {self.filters.to_json_array()}") - self.relay_manager.add_subscription(self.subscription_id, self.filters) - print(f"DEBUG: Publishing subscription request") - request_message = [ClientMessageType.REQUEST, self.subscription_id] - request_message.extend(self.filters.to_json_array()) - self.relay_manager.publish_message(json.dumps(request_message)) - for _ in range(10): - if not self.keep_running: - return - time.sleep(0.5) - - self.fetch_balance() - - print(f"DEBUG: Waiting for incoming NWC events...") - while self.keep_running: - if self.relay_manager.message_pool.has_events(): - print(f"DEBUG: Event received from message pool") - event_msg = self.relay_manager.message_pool.get_event() - event_created_at = event_msg.event.created_at - print(f"Received at {time.localtime()} a message with timestamp {event_created_at}") - try: - decrypted_content = self.private_key.decrypt_message( - event_msg.event.content, - event_msg.event.public_key - ) - print(f"DEBUG: Decrypted content: {decrypted_content}") - response = json.loads(decrypted_content) - print(f"DEBUG: Parsed response: {response}") - result = response.get("result") - if result: - if result.get("balance"): - new_balance = round(int(result["balance"]) / 1000) - print(f"Got balance: {new_balance}") - self.handle_new_balance(new_balance) - elif result.get("transactions"): - print("Response contains transactions!") - for transaction in result["transactions"]: - amount = transaction["amount"] - amount = round(amount / 1000) - comment = self.getCommentFromTransaction(transaction) - epoch_time = transaction["created_at"] - paymentObj = Payment(epoch_time, amount, comment) - self.handle_new_payment(paymentObj) - else: - notification = response.get("notification") - if notification: - amount = notification["amount"] - amount = round(amount / 1000) - type = notification["type"] - if type == "outgoing": - amount = -amount - elif type != "incoming": - print(f"WARNING: invalid notification type {type}, ignoring.") - continue - new_balance = self.last_known_balance + amount - self.handle_new_balance(new_balance, False) - epoch_time = notification["created_at"] - comment = self.getCommentFromTransaction(notification) - paymentObj = Payment(epoch_time, amount, comment) - self.handle_new_payment(paymentObj) - else: - print("Unsupported response, ignoring.") - except Exception as e: - print(f"DEBUG: Error processing response: {e}") - time.sleep(0.2) - - print("NWCWallet: manage_wallet_thread stopping, closing connections...") - self.relay_manager.close_connections() - - def fetch_balance(self): - if not self.keep_running: - return - # Create get_balance request - balance_request = { - "method": "get_balance", - "params": {} - } - print(f"DEBUG: Created balance request: {balance_request}") - print(f"DEBUG: Creating encrypted DM to wallet pubkey: {self.wallet_pubkey}") - dm = EncryptedDirectMessage( - recipient_pubkey=self.wallet_pubkey, - cleartext_content=json.dumps(balance_request) - ) - print(f"DEBUG: Signing DM {json.dumps(dm)} with private key") - self.private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm - print(f"DEBUG: Publishing encrypted DM") - self.relay_manager.publish_event(dm) - - def fetch_payments(self): - if not self.keep_running: - return - # Create get_balance request - list_transactions = { - "method": "list_transactions", - "params": { - "limit": 6 - } - } - dm = EncryptedDirectMessage( - recipient_pubkey=self.wallet_pubkey, - cleartext_content=json.dumps(list_transactions) - #cleartext_content='{"params":{"limit": 4 },"method":"list_transactions"}' - ) - self.private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm - print("Publishing DM to fetch payments...") - self.relay_manager.publish_event(dm) - - def parse_nwc_url(self, nwc_url): - """Parse Nostr Wallet Connect URL to extract pubkey, relay, secret, and lud16.""" - print(f"DEBUG: Starting to parse NWC URL: {nwc_url}") - try: - # Remove 'nostr+walletconnect://' or 'nwc:' prefix - if nwc_url.startswith('nostr+walletconnect://'): - print(f"DEBUG: Removing 'nostr+walletconnect://' prefix") - nwc_url = nwc_url[22:] - elif nwc_url.startswith('nwc:'): - print(f"DEBUG: Removing 'nwc:' prefix") - nwc_url = nwc_url[4:] - else: - print(f"DEBUG: No recognized prefix found in URL") - raise ValueError("Invalid NWC URL: missing 'nostr+walletconnect://' or 'nwc:' prefix") - print(f"DEBUG: URL after prefix removal: {nwc_url}") - # urldecode because the relay might have %3A%2F%2F etc - nwc_url = mpos.util.urldecode(nwc_url) - print(f"after urldecode: {nwc_url}") - # Split into pubkey and query params - parts = nwc_url.split('?') - pubkey = parts[0] - print(f"DEBUG: Extracted pubkey: {pubkey}") - # Validate pubkey (should be 64 hex characters) - if len(pubkey) != 64 or not all(c in '0123456789abcdef' for c in pubkey): - raise ValueError("Invalid NWC URL: pubkey must be 64 hex characters") - # Extract relay, secret, and lud16 from query params - relay = None - secret = None - lud16 = None - if len(parts) > 1: - print(f"DEBUG: Query parameters found: {parts[1]}") - params = parts[1].split('&') - for param in params: - if param.startswith('relay='): - relay = param[6:] - print(f"DEBUG: Extracted relay: {relay}") - elif param.startswith('secret='): - secret = param[7:] - print(f"DEBUG: Extracted secret: {secret}") - elif param.startswith('lud16='): - lud16 = param[6:] - print(f"DEBUG: Extracted lud16: {lud16}") - else: - print(f"DEBUG: No query parameters found") - if not pubkey or not relay or not secret: - raise ValueError("Invalid NWC URL: missing required fields (pubkey, relay, or secret)") - # Validate secret (should be 64 hex characters) - if len(secret) != 64 or not all(c in '0123456789abcdef' for c in secret): - raise ValueError("Invalid NWC URL: secret must be 64 hex characters") - print(f"DEBUG: Parsed NWC data - Relay: {relay}, Pubkey: {pubkey}, Secret: {secret}, lud16: {lud16}") - return relay, pubkey, secret, lud16 - except Exception as e: - print(f"DEBUG: Error parsing NWC URL: {e}") - diff --git a/internal_filesystem/apps/com.lightningpiggy.displaywallet/res/mipmap-mdpi/icon_64x64.png b/internal_filesystem/apps/com.lightningpiggy.displaywallet/res/mipmap-mdpi/icon_64x64.png deleted file mode 100644 index c0871732..00000000 Binary files a/internal_filesystem/apps/com.lightningpiggy.displaywallet/res/mipmap-mdpi/icon_64x64.png and /dev/null differ