diff --git a/draft_code/aiohttp_compiled_mip_installed_but_missing_bugfix_of_chunked_reading/__init__.mpy b/draft_code/aiohttp_compiled_mip_installed_but_missing_bugfix_of_chunked_reading/__init__.mpy new file mode 100644 index 00000000..524ce853 Binary files /dev/null and b/draft_code/aiohttp_compiled_mip_installed_but_missing_bugfix_of_chunked_reading/__init__.mpy differ diff --git a/draft_code/aiohttp_compiled_mip_installed_but_missing_bugfix_of_chunked_reading/aiohttp_ws.mpy b/draft_code/aiohttp_compiled_mip_installed_but_missing_bugfix_of_chunked_reading/aiohttp_ws.mpy new file mode 100644 index 00000000..37f3b61d Binary files /dev/null and b/draft_code/aiohttp_compiled_mip_installed_but_missing_bugfix_of_chunked_reading/aiohttp_ws.mpy differ diff --git a/draft_code/apps.py_dont_compile_mpy_dontwork b/draft_code/apps.py_dont_compile_mpy_dontwork new file mode 100644 index 00000000..c9a55a05 --- /dev/null +++ b/draft_code/apps.py_dont_compile_mpy_dontwork @@ -0,0 +1,449 @@ +import lvgl as lv + +import uio +import ujson +import uos + +import _thread +import traceback + +import mpos.info +import mpos.ui + +def good_stack_size(): + stacksize = 24*1024 + import sys + if sys.platform == "esp32": + stacksize = 16*1024 + return stacksize + +# Run the script in the current thread: +def execute_script(script_source, is_file, cwd=None, classname=None): + thread_id = _thread.get_ident() + compile_name = 'script' if not is_file else script_source + print(f"Thread {thread_id}: executing script with cwd: {cwd}") + + script_globals = {'lv': lv, '__name__': "__main__"} + import sys + import uos + import utime + + path_before = sys.path + if cwd: + sys.path.append(cwd) + + try: + if is_file: + mpy_file = script_source.rsplit('.py', 1)[0] + '.mpy' if '.py' in script_source else script_source + '.mpy' + try: + uos.stat(mpy_file) + source_file = mpy_file + except OSError: + source_file = script_source + mode = 'rb' if source_file.endswith('.mpy') else 'r' + print(f"Thread {thread_id}: reading {'bytecode' if mode == 'rb' else 'script'} from {source_file}") + + start_time = utime.ticks_ms() + f = open(source_file, mode) + try: + script_source = f.read() + finally: + f.close() + read_time = utime.ticks_diff(utime.ticks_ms(), start_time) + print(f"Thread {thread_id}: file read took {read_time} ms") + + try: + start_time = utime.ticks_ms() + compiled_script = script_source if isinstance(script_source, bytes) else compile(script_source, compile_name, 'exec') + compile_time = utime.ticks_diff(utime.ticks_ms(), start_time) + if not isinstance(script_source, bytes): + print(f"Thread {thread_id}: compilation took {compile_time} ms") + + exec(compiled_script, script_globals) + if classname: + main_activity = script_globals.get(classname) + if main_activity: + Activity.startActivity(None, Intent(activity_class=main_activity)) + else: + print("Warning: could not find main_activity") + except Exception as e: + print(f"Thread {thread_id}: exception during execution:") + traceback.print_exception(type(e), e, getattr(e, '__traceback__', None)) + print(f"Thread {thread_id}: script {compile_name} finished") + except Exception as e: + print(f"Thread {thread_id}: error:") + traceback.print_exception(type(e), e, getattr(e, '__traceback__', None)) + sys.path = path_before + +# Run the script in a new thread: +# TODO: check if the script exists here instead of launching a new thread? +def execute_script_new_thread(scriptname, is_file): + print(f"main.py: execute_script_new_thread({scriptname},{is_file})") + try: + # 168KB maximum at startup but 136KB after loading display, drivers, LVGL gui etc so let's go for 128KB for now, still a lot... + # But then no additional threads can be created. A stacksize of 32KB allows for 4 threads, so 3 in the app itself, which might be tight. + # 16KB allows for 10 threads in the apps, but seems too tight for urequests on unix (desktop) targets + # 32KB seems better for the camera, but it forced me to lower other app threads from 16 to 12KB + #_thread.stack_size(24576) # causes camera issue... + # NOTE: This doesn't do anything if apps are started in the same thread! + if "camtest" in scriptname: + print("Starting camtest with extra stack size!") + stack=32*1024 + elif "appstore"in scriptname: + print("Starting appstore with extra stack size!") + stack=24*1024 # this doesn't do anything because it's all started in the same thread + else: + stack=16*1024 # 16KB doesn't seem to be enough for the AppStore app on desktop + stack = mpos.apps.good_stack_size() + print(f"app.py: setting stack size for script to {stack}") + _thread.stack_size(stack) + _thread.start_new_thread(execute_script, (scriptname, is_file)) + except Exception as e: + print("main.py: execute_script_new_thread(): error starting new thread thread: ", e) + +def start_app_by_name(app_name, is_launcher=False): + mpos.ui.set_foreground_app(app_name) + custom_app_dir=f"apps/{app_name}" + builtin_app_dir=f"builtin/apps/{app_name}" + try: + stat = uos.stat(custom_app_dir) + start_app(custom_app_dir, is_launcher) + except OSError: + start_app(builtin_app_dir, is_launcher) + +def start_app(app_dir, is_launcher=False): + print(f"main.py start_app({app_dir},{is_launcher})") + mpos.ui.set_foreground_app(app_dir) # would be better to store only the app name... + manifest_path = f"{app_dir}/META-INF/MANIFEST.JSON" + app = mpos.apps.parse_manifest(manifest_path) + print(f"start_app parsed manifest and got: {str(app)}") + main_launcher_activity = find_main_launcher_activity(app) + if not main_launcher_activity: + print(f"WARNING: can't start {app_dir} because no main_launcher_activity was found.") + return + start_script_fullpath = f"{app_dir}/{main_launcher_activity.get('entrypoint')}" + execute_script(start_script_fullpath, True, app_dir + "/assets/", main_launcher_activity.get("classname")) + # Launchers have the bar, other apps don't have it + if is_launcher: + mpos.ui.open_bar() + else: + mpos.ui.close_bar() + +def restart_launcher(): + mpos.ui.empty_screen_stack() + # No need to stop the other launcher first, because it exits after building the screen + start_app_by_name("com.micropythonos.launcher", True) # Would be better to query the PackageManager for Activities that are launchers + +def find_main_launcher_activity(app): + result = None + for activity in app.activities: + if not activity.get("entrypoint") or not activity.get("classname"): + print(f"Warning: activity {activity} has no entrypoint and classname, skipping...") + continue + print("checking activity's intent_filters...") + for intent_filter in activity.get("intent_filters"): + print("checking intent_filter...") + if intent_filter.get("action") == "main" and intent_filter.get("category") == "launcher": + print("found main_launcher!") + result = activity + break + return result + +def is_launcher(app_name): + print(f"checking is_launcher for {app_name}") + # Simple check, could be more elaborate by checking the MANIFEST.JSON for the app... + return "launcher" in app_name + + +class App: + def __init__(self, name, publisher, short_description, long_description, icon_url, download_url, fullname, version, category, activities): + self.name = name + self.publisher = publisher + self.short_description = short_description + self.long_description = long_description + self.icon_url = icon_url + self.download_url = download_url + self.fullname = fullname + self.version = version + self.category = category + self.image = None + self.image_dsc = None + self.activities = activities + + def __str__(self): + return (f"App(name='{self.name}', " + f"publisher='{self.publisher}', " + f"short_description='{self.short_description}', " + f"version='{self.version}', " + f"category='{self.category}', " + f"activities={self.activities})") + +def parse_manifest(manifest_path): + # Default values for App object + default_app = App( + name="Unknown", + publisher="Unknown", + short_description="", + long_description="", + icon_url="", + download_url="", + fullname="Unknown", + version="0.0.0", + category="", + activities=[] + ) + try: + with open(manifest_path, 'r') as f: + app_info = ujson.load(f) + #print(f"parsed app: {app_info}") + # Create App object with values from manifest, falling back to defaults + return App( + name=app_info.get("name", default_app.name), + publisher=app_info.get("publisher", default_app.publisher), + short_description=app_info.get("short_description", default_app.short_description), + long_description=app_info.get("long_description", default_app.long_description), + icon_url=app_info.get("icon_url", default_app.icon_url), + download_url=app_info.get("download_url", default_app.download_url), + fullname=app_info.get("fullname", default_app.fullname), + version=app_info.get("version", default_app.version), + category=app_info.get("category", default_app.category), + activities=app_info.get("activities", default_app.activities) + ) + except OSError: + print(f"parse_manifest: error loading manifest_path: {manifest_path}") + return default_app + + + +def auto_connect(): + builtin_auto_connect = "builtin/system/WifiService.py" + try: + print(f"Starting {builtin_auto_connect}...") + stat = uos.stat(builtin_auto_connect) + execute_script_new_thread(builtin_auto_connect, True) + except Exception as e: + print("Couldn't execute {builtin_auto_connect} because exception {e}, continuing...") + + +class Activity: + + def __init__(self): + self.intent = None # Store the intent that launched this activity + self.result = None + self._result_callback = None + + def onCreate(self): + pass + def onStart(self, screen): + pass + def onResume(self, screen): + pass + def onPause(self, screen): + pass + def onStop(self, screen): + pass + def onDestroy(self, screen): + pass + + def setContentView(self, screen): + mpos.ui.setContentView(self, screen) + + def startActivity(self, intent): + ActivityNavigator.startActivity(intent) + + def startActivityForResult(self, intent, result_callback): + ActivityNavigator.startActivityForResult(intent, result_callback) + + def initError(self, e): + print(f"WARNING: You might have inherited from Activity with a custom __init__() without calling super().__init__(). Got AttributeError: {e}") + + def getIntent(self): + try: + return self.intent + except AttributeError as e: + self.initError(e) + + def setResult(self, result_code, data=None): + """Set the result to be returned when the activity finishes.""" + try: + self.result = {"result_code": result_code, "data": data or {}} + except AttributeError as e: + self.initError(e) + + def finish(self): + mpos.ui.back_screen() + try: + if self._result_callback and self.result: + self._result_callback(self.result) + self._result_callback = None # Clean up + except AttributeError as e: + self.initError(e) + +class Intent: + def __init__(self, activity_class=None, action=None, data=None, extras=None): + self.activity_class = activity_class # Explicit target (e.g., SettingsActivity) + self.action = action # Action string (e.g., "view", "share") + self.data = data # Single data item (e.g., URL) + self.extras = extras or {} # Dictionary for additional data + self.flags = {} # Simplified flags: {"clear_top": bool, "no_history": bool, "no_animation": bool} + + def addFlag(self, flag, value=True): + self.flags[flag] = value + return self + + def putExtra(self, key, value): + self.extras[key] = value + return self + + +class ActivityNavigator: + @staticmethod + def startActivity(intent): + if not isinstance(intent, Intent): + raise ValueError("Must provide an Intent") + if intent.action: # Implicit intent: resolve handlers + handlers = APP_REGISTRY.get(intent.action, []) + if len(handlers) == 1: + intent.activity_class = handlers[0] + ActivityNavigator._launch_activity(intent) + elif handlers: + ActivityNavigator._show_chooser(intent, handlers) + else: + raise ValueError(f"No handlers for action: {intent.action}") + else: + ActivityNavigator._launch_activity(intent) + + @staticmethod + def startActivityForResult(intent, result_callback): + """Launch an activity and pass a callback for the result.""" + if not isinstance(intent, Intent): + raise ValueError("Must provide an Intent") + if intent.action: # Implicit intent: resolve handlers + handlers = APP_REGISTRY.get(intent.action, []) + if len(handlers) == 1: + intent.activity_class = handlers[0] + return ActivityNavigator._launch_activity(intent, result_callback) + elif handlers: + ActivityNavigator._show_chooser(intent, handlers) + return None # Chooser handles result forwarding + else: + raise ValueError(f"No handlers for action: {intent.action}") + else: + return ActivityNavigator._launch_activity(intent, result_callback) + + @staticmethod + def _launch_activity(intent, result_callback=None): + """Launch an activity and set up result callback.""" + activity = intent.activity_class() + activity.intent = intent + activity._result_callback = result_callback # Pass callback to activity + activity.onCreate() + return activity + + @staticmethod + def _show_chooser(intent, handlers): + chooser_intent = Intent(ChooserActivity, extras={"original_intent": intent, "handlers": [h.__name__ for h in handlers]}) + ActivityNavigator._launch_activity(chooser_intent) + + +class ChooserActivity(Activity): + def __init__(self): + super().__init__() + + def onCreate(self): + screen = lv.obj() + # Get handlers from intent extras + original_intent = self.getIntent().extras.get("original_intent") + handlers = self.getIntent().extras.get("handlers", []) + label = lv.label(screen) + label.set_text("Choose an app") + label.set_pos(10, 10) + + for i, handler_name in enumerate(handlers): + btn = lv.btn(screen) + btn.set_user_data(f"handler_{i}") + btn_label = lv.label(btn) + btn_label.set_text(handler_name) + btn.set_pos(10, 50 * (i + 1) + 10) + btn.add_event_cb(lambda e, h=handler_name, oi=original_intent: self._select_handler(h, oi), lv.EVENT.CLICKED) + self.setContentView(screen) + + def _select_handler(self, handler_name, original_intent): + for handler in APP_REGISTRY.get(original_intent.action, []): + if handler.__name__ == handler_name: + original_intent.activity_class = handler + navigator.startActivity(original_intent) + break + navigator.finish() # Close chooser + + def onStop(self, screen): + if self.getIntent() and self.getIntent().getStringExtra("destination") == "ChooserActivity": + print("Stopped for Chooser") + else: + print("Stopped for other screen") + + +class ViewActivity(Activity): + def __init__(self): + super().__init__() + + def onCreate(self): + screen = lv.obj() + # Get content from intent (prefer extras.url, fallback to data) + content = self.getIntent().extras.get("url", self.getIntent().data or "No content") + label = lv.label(screen) + label.set_user_data("content_label") + label.set_text(f"Viewing: {content}") + label.center() + self.setContentView(screen) + + def onStart(self, screen): + content = self.getIntent().extras.get("url", self.getIntent().data or "No content") + for i in range(screen.get_child_cnt()): + if screen.get_child(i).get_user_data() == "content_label": + screen.get_child(i).set_text(f"Viewing: {content}") + + def onStop(self, screen): + if self.getIntent() and self.getIntent().getStringExtra("destination") == "ViewActivity": + print("Stopped for View") + else: + print("Stopped for other screen") + +class ShareActivity(Activity): + def __init__(self): + super().__init__() + + def onCreate(self): + screen = lv.obj() + # Get text from intent (prefer extras.text, fallback to data) + text = self.getIntent().extras.get("text", self.getIntent().data or "No text") + label = lv.label(screen) + label.set_user_data("share_label") + label.set_text(f"Share: {text}") + label.set_pos(10, 10) + + btn = lv.btn(screen) + btn.set_user_data("share_btn") + btn_label = lv.label(btn) + btn_label.set_text("Share") + btn.set_pos(10, 50) + btn.add_event_cb(lambda e: self._share_content(text), lv.EVENT.CLICKED) + self.setContentView(screen) + + def _share_content(self, text): + # Dispatch to another app (e.g., MessagingActivity) or simulate sharing + print(f"Sharing: {text}") # Placeholder for actual sharing + # Example: Launch another share handler + navigator.startActivity(Intent(action="share", data=text)) + navigator.finish() # Close ShareActivity + + def onStop(self, screen): + if self.getIntent() and self.getIntent().getStringExtra("destination") == "ShareActivity": + print("Stopped for Share") + else: + print("Stopped for other screen") + +APP_REGISTRY = { # This should be handled by a new class PackageManager: + "view": [ViewActivity], # Hypothetical activities + "share": [ShareActivity] +} diff --git a/draft_code/asyncwebsocketclient.py b/draft_code/asyncwebsocketclient.py new file mode 100644 index 00000000..01967d5c --- /dev/null +++ b/draft_code/asyncwebsocketclient.py @@ -0,0 +1,284 @@ +import socket +import asyncio as a +import binascii as b +import random as r +from collections import namedtuple +import re +import struct +import ssl + +# Opcodes +OP_CONT = const(0x0) +OP_TEXT = const(0x1) +OP_BYTES = const(0x2) +OP_CLOSE = const(0x8) +OP_PING = const(0x9) +OP_PONG = const(0xa) + +# Close codes +CLOSE_OK = const(1000) +CLOSE_GOING_AWAY = const(1001) +CLOSE_PROTOCOL_ERROR = const(1002) +CLOSE_DATA_NOT_SUPPORTED = const(1003) +CLOSE_BAD_DATA = const(1007) +CLOSE_POLICY_VIOLATION = const(1008) +CLOSE_TOO_BIG = const(1009) +CLOSE_MISSING_EXTN = const(1010) +CLOSE_BAD_CONDITION = const(1011) + +URL_RE = re.compile(r'(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?') +URI = namedtuple('URI', ('protocol', 'hostname', 'port', 'path')) + +class AsyncWebsocketClient: + def __init__(self, ms_delay_for_read: int = 5): + self._open = False + self.delay_read = ms_delay_for_read + self._lock_for_open = a.Lock() + self.sock = None + + async def open(self, new_val: bool = None): + await self._lock_for_open.acquire() + if new_val is not None: + if not new_val and self.sock: + self.sock.close() + self.sock = None + self._open = new_val + to_return = self._open + self._lock_for_open.release() + return to_return + + async def close(self): + return await self.open(False) + + def urlparse(self, uri): + """Parse ws or wss:// URLs""" + match = URL_RE.match(uri) + if match: + protocol, host, port, path = match.group(1), match.group(2), match.group(3), match.group(4) + + if protocol not in ['ws', 'wss']: + raise ValueError('Scheme {} is invalid'.format(protocol)) + + if port is None: + port = (80, 443)[protocol == 'wss'] + + return URI(protocol, host, int(port), path) + + async def a_readline(self): + line = None + while line is None: + line = self.sock.readline() + await a.sleep_ms(self.delay_read) + + return line + + async def a_read(self, size: int = None): + if size == 0: + return b'' + chunks = [] + + while True: + b = self.sock.read(size) + await a.sleep_ms(self.delay_read) + + # Continue reading if the socket returns None + if b is None: continue + + # In some cases, the socket will return an empty bytes + # after PING or PONG frames, we need to ignore them. + if len(b) == 0: break + + chunks.append(b) + size -= len(b) + + # After reading the first chunk, we can break if size is None or 0 + if size is None or size == 0: break + + # Join all the chunks and return them + return b''.join(chunks) + + async def handshake(self, uri, headers=[], keyfile=None, certfile=None, cafile=None, cert_reqs=0): + if self.sock: + self.close() + + self.sock = socket.socket() + self.uri = self.urlparse(uri) + ai = socket.getaddrinfo(self.uri.hostname, self.uri.port) + addr = ai[0][4] + + self.sock.connect(addr) + self.sock.setblocking(False) + + if self.uri.protocol == 'wss': + cadata = None + if not cafile is None: + with open(cafile, 'rb') as f: + cadata = f.read() + self.sock = ssl.wrap_socket( + self.sock, server_side=False, + key=keyfile, cert=certfile, + cert_reqs=cert_reqs, # 0 - NONE, 1 - OPTIONAL, 2 - REQUIED + cadata=cadata, + server_hostname=self.uri.hostname + ) + + def send_header(header, *args): + self.sock.write(header % args + '\r\n') + + # Sec-WebSocket-Key is 16 bytes of random base64 encoded + key = b.b2a_base64(bytes(r.getrandbits(8) + for _ in range(16)))[:-1] + + send_header(b'GET %s HTTP/1.1', self.uri.path or '/') + send_header(b'Host: %s:%s', self.uri.hostname, self.uri.port) + send_header(b'Connection: Upgrade') + send_header(b'Upgrade: websocket') + send_header(b'Sec-WebSocket-Key: %s', key) + send_header(b'Sec-WebSocket-Version: 13') + send_header(b'Origin: http://{hostname}:{port}'.format( + hostname=self.uri.hostname, + port=self.uri.port) + ) + + for key, value in headers: + send_header(b'%s: %s', key, value) + + send_header(b'') + + line = await self.a_readline() + header = (line)[:-2] + if not header.startswith(b'HTTP/1.1 101 '): + raise Exception(header) + + # We don't (currently) need these headers + # FIXME: should we check the return key? + while header: + line = await self.a_readline() + header = (line)[:-2] + + return await self.open(True) + + async def read_frame(self, max_size=None): + # Frame header + byte1, byte2 = struct.unpack('!BB', await self.a_read(2)) + + # Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) + fin = bool(byte1 & 0x80) + opcode = byte1 & 0x0f + + # Byte 2: MASK(1) LENGTH(7) + mask = bool(byte2 & (1 << 7)) + length = byte2 & 0x7f + + if length == 126: # Magic number, length header is 2 bytes + length, = struct.unpack('!H', await self.a_read(2)) + elif length == 127: # Magic number, length header is 8 bytes + length, = struct.unpack('!Q', await self.a_read(8)) + + if mask: # Mask is 4 bytes + mask_bits = await self.a_read(4) + + try: + data = await self.a_read(length) + except MemoryError: + # We can't receive this many bytes, close the socket + self.close(code=CLOSE_TOO_BIG) + # await self._stream.drain() + return True, OP_CLOSE, None + + if mask: + data = bytes(b ^ mask_bits[i % 4] + for i, b in enumerate(data)) + + return fin, opcode, data + + def write_frame(self, opcode, data=b''): + fin = True + mask = True # messages sent by client are masked + + length = len(data) + + # Frame header + # Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) + byte1 = 0x80 if fin else 0 + byte1 |= opcode + + # Byte 2: MASK(1) LENGTH(7) + byte2 = 0x80 if mask else 0 + + if length < 126: # 126 is magic value to use 2-byte length header + byte2 |= length + self.sock.write(struct.pack('!BB', byte1, byte2)) + + elif length < (1 << 16): # Length fits in 2-bytes + byte2 |= 126 # Magic code + self.sock.write(struct.pack('!BBH', byte1, byte2, length)) + + elif length < (1 << 64): + byte2 |= 127 # Magic code + self.sock.write(struct.pack('!BBQ', byte1, byte2, length)) + + else: + raise ValueError() + + if mask: # Mask is 4 bytes + mask_bits = struct.pack('!I', r.getrandbits(32)) + self.sock.write(mask_bits) + data = bytes(b ^ mask_bits[i % 4] + for i, b in enumerate(data)) + + self.sock.write(data) + + async def recv(self): + while await self.open(): + try: + fin, opcode, data = await self.read_frame() + # except (ValueError, EOFError) as ex: + except Exception as ex: + print('Exception in recv while reading frame:', ex) + await self.open(False) + return + + if not fin: + raise NotImplementedError() + + if opcode == OP_TEXT: + return data.decode('utf-8') + elif opcode == OP_BYTES: + return data + elif opcode == OP_CLOSE: + await self.open(False) + return + elif opcode == OP_PONG: + # Ignore this frame, keep waiting for a data frame + continue + elif opcode == OP_PING: + try: + # We need to send a pong frame + self.write_frame(OP_PONG, data) + + # And then continue to wait for a data frame + continue + except Exception as ex: + print('Error sending pong frame:', ex) + # If sending the pong frame fails, close the connection + await self.open(False) + return + elif opcode == OP_CONT: + # This is a continuation of a previous frame + raise NotImplementedError(opcode) + else: + raise ValueError(opcode) + + async def send(self, buf): + if not await self.open(): + return + if isinstance(buf, str): + opcode = OP_TEXT + buf = buf.encode('utf-8') + elif isinstance(buf, bytes): + opcode = OP_BYTES + else: + raise TypeError() + self.write_frame(opcode, buf) + diff --git a/draft_code/dropdown_switch.py b/draft_code/dropdown_switch.py new file mode 100644 index 00000000..d4bfd9b1 --- /dev/null +++ b/draft_code/dropdown_switch.py @@ -0,0 +1,9 @@ +# Create a dropdown +dropdown = lv.dropdown(lv.screen_active()) +dropdown.set_options("Option 1\nOption 2\nOption 3") +dropdown.align(lv.ALIGN.CENTER, 0, 0) + + +switch = lv.switch(lv.screen_active()) +switch.center() + diff --git a/draft_code/image_decoder.py b/draft_code/image_decoder.py new file mode 100644 index 00000000..f8f37584 --- /dev/null +++ b/draft_code/image_decoder.py @@ -0,0 +1,39 @@ + + +import lvgl as lv + +def list_image_decoders(): + # Initialize LVGL + lv.init() + + # Start with the first decoder + first = lv.image_decoder_t() + #decoder = lv.image.decoder_get_next(first) + decoder = lv.image_decoder_t.get_next(first) + index = 0 + + # Iterate through all decoders + while decoder is not None: + print(f"Image Decoder {index}: {decoder}") + index += 1 + #decoder = lv.image.decoder_get_next(decoder) + decoder = lv.image_decoder_t.get_next(decoder) + + if index == 0: + print("No image decoders found.") + else: + print(f"Total image decoders: {index}") + +# Run the function +list_image_decoders() + + +i = lv.image(lv.screen_active()); +#i.set_src("P:/home/user/sources/MicroPythonOS/artwork/image.jpg"); +i.set_src("P:/home/user/sources/MicroPythonOS/artwork/icon_64x64.jpg"); +i.center() +h = lv.image_header_t() +i.decoder_get_info(i.image_dsc, h) +print("image info:") +print(h) +print(f"widthxheight: {h.w}x{h.h}") diff --git a/draft_code/keyboard_test.py b/draft_code/keyboard_test.py new file mode 100644 index 00000000..6a344028 --- /dev/null +++ b/draft_code/keyboard_test.py @@ -0,0 +1,110 @@ +# Hardware initialization for Unix and MacOS systems + +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler + + +# Add lib/ to the path for modules, otherwise it will only search in ~/.micropython/lib and /usr/lib/micropython +import sys +sys.path.append('lib/') + + +import mpos.ui + + +#TFT_HOR_RES=640 +#TFT_VER_RES=480 +TFT_HOR_RES=320 +TFT_VER_RES=240 + +def window_cb(args): # doesn't get called + print(f"Window callback: {args}") + +bus = lcd_bus.SDLBus(flags=0) +bus.register_window_callback(window_cb) + +# bus.set_window_size(320,240,-1,False) # -1 might be 25 but it always becomes black, except for format 0 + +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) + +display = sdl_display.SDLDisplay(data_bus=bus,display_width=TFT_HOR_RES,display_height=TFT_VER_RES,frame_buffer1=buf1,color_space=lv.COLOR_FORMAT.RGB565) +display.init() + +import sdl_pointer +mouse = sdl_pointer.SDLPointer() + +import sdl_keyboard +sdlkeyboard = sdl_keyboard.SDLKeyboard() + +#indev.set_read_cb(keypad_cb) + +# seems indev isn't properly initialized +def keypad_cb(indev, indev_data): + global sdlkeyboard + #print(f"keypad_cb {indev} {indev_data}") + #key = indev.get_key() # always 0 + #print(f"key {key}") + #key = indev_data.get("key") + #print(f"key {key}") + pressed, code = sdlkeyboard._get_key() + print(f"periodic pressed: {pressed}, code: {code}") + sdlkeyboard._read(indev, indev_data) + # I mean we could read the key and put it in the textarea but I want some kind of keypress :-/ + +sdlkeyboard._indev_drv.set_read_cb(keypad_cb) # check for escape + +def keyboard_cb(event): + event_code=event.get_code() + print(f"keyboard_test YES: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()} + + +def button_cb(event): + event_code=event.get_code() + name = mpos.ui.get_event_name(event_code) + print(f"button_cb YES: code={event_code} and name {name}") + +# for some reason, this text areas is receiving mouse events, and draw events, but not key events... +def ta_callback_again(event): + event_code=event.get_code() + if event_code in [19,23,25,26,27,28,29,30,49]: + return + name = mpos.ui.get_event_name(event_code) + print(f"ta_callback_again {event_code} and {name}") + #print(f"ta_callback_again: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()} + +sdlkeyboard.add_event_cb(keyboard_cb, lv.EVENT.ALL, None) + + +#group = lv.group_create() +#group = keyboard.get_group() + +th = task_handler.TaskHandler(duration=5) # 5ms is recommended for MicroPython+LVGL on desktop + +screen = lv.screen_active() + +b = lv.button(screen) +b.center() +b.add_event_cb(button_cb, lv.EVENT.ALL, None) +#group.add_obj(b) + +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.align(lv.ALIGN.TOP_LEFT,0,0) +ta.add_event_cb(ta_callback_again, lv.EVENT.ALL, None) + +#group.add_obj(ta) + +takeyboard = lv.keyboard(screen) +takeyboard.set_textarea(ta) + + +# this does something, but just gives indev 0, being error... +#indev = lv.indev_create() +#indev.set_type(lv.INDEV_TYPE.KEYPAD) +#indev.set_read_cb(keypad_cb) # check for escape + + + +#keyboard.set_group(group) diff --git a/draft_code/keyboard_test_ai.py b/draft_code/keyboard_test_ai.py new file mode 100644 index 00000000..a01f9d62 --- /dev/null +++ b/draft_code/keyboard_test_ai.py @@ -0,0 +1,69 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import mpos.ui +import sdl_pointer +import sdl_keyboard + +# Display resolution +TFT_HOR_RES = 320 +TFT_VER_RES = 240 + +# Initialize display +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +# Create group for input devices +group = lv.group_create() +keyboard.set_group(group) + + +# Create textarea +screen = lv.screen_active() +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.align(lv.ALIGN.TOP_LEFT, 0, 0) +ta.set_placeholder_text("Type here") +group.add_obj(ta) + +# Optional: Debug event callback for textarea +def ta_event_cb(event): + event_code = event.get_code() + name = mpos.ui.get_event_name(event_code) + print(f"Textarea event: code={event_code}, name={name}") + +ta.add_event_cb(ta_event_cb, lv.EVENT.ALL, None) + +# Optional: Create an on-screen keyboard +keyboard_widget = lv.keyboard(screen) +keyboard_widget.set_textarea(ta) +keyboard_widget.add_flag(lv.obj.FLAG.HIDDEN) + +def ta_focus_cb(event): + event_code = event.get_code() + if event_code == lv.EVENT.FOCUSED: + keyboard_widget.clear_flag(lv.obj.FLAG.HIDDEN) + elif event_code == lv.EVENT.DEFOCUSED: + keyboard_widget.add_flag(lv.obj.FLAG.HIDDEN) + +ta.add_event_cb(ta_focus_cb, lv.EVENT.FOCUSED | lv.EVENT.DEFOCUSED, None) + +# Task handler +th = task_handler.TaskHandler(duration=5) # 5ms for desktop diff --git a/draft_code/keyboard_test_diy.py b/draft_code/keyboard_test_diy.py new file mode 100644 index 00000000..55b25b36 --- /dev/null +++ b/draft_code/keyboard_test_diy.py @@ -0,0 +1,113 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import sdl_pointer +import sdl_keyboard + +# Initialize display +TFT_HOR_RES = 320 +TFT_VER_RES = 240 +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +#keyboard = sdl_keyboard.SDLKeyboard() + +pressed = False +def get_key(indev,data): + print("simulating get_key") + global pressed + if not pressed: + # input your keypad code + data.state = 1 #1 for press 0 for released + data.key=100 + #pressed = True + else: + data.state = 0 #1 for press 0 for released + data.key=100 + pressed = False + + +# Create group +group = lv.group_create() +group.set_default() + +keyboard=lv.indev_create() +keyboard.set_type(lv.INDEV_TYPE.KEYPAD) +keyboard.set_read_cb(get_key) +keyboard.set_group(group) + +#keyboard.set_group(group) + +# Create widgets +screen = lv.screen_active() + +# Textarea +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.set_placeholder_text("Type here") +ta.align(lv.ALIGN.TOP_LEFT, 10, 10) +group.add_obj(ta) + +# Switch +sw = lv.switch(screen) +sw.align(lv.ALIGN.TOP_LEFT, 10, 50) +group.add_obj(sw) + +# Test Button +btn = lv.button(screen) +btn.align(lv.ALIGN.TOP_LEFT, 10, 90) +lbl = lv.label(btn) +lbl.set_text("Test Button") +group.add_obj(btn) + +# Simulate NEXT key button +btn_next = lv.button(screen) +btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10) +lbl_next = lv.label(btn_next) +lbl_next.set_text("NEXT") +def btn_next_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press + keyboard._keypad_cb(None, 0, 9, 0) # Simulate release +btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None) + +# Simulate ENTER key button +btn_enter = lv.button(screen) +btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10) +lbl_enter = lv.label(btn_enter) +lbl_enter.set_text("ENTER") +def btn_enter_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press + keyboard._keypad_cb(None, 0, 13, 0) # Simulate release +btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None) + +# Debug focus +def check_focus(): + focused = lv.group_get_focused(group) + print(f"Focused widget: {focused}") +th = task_handler.TaskHandler(duration=5) +th.register_task(check_focus, 1000) + +# Debug events +def event_cb(event, name): + event_code = event.get_code() + print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}") +ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None) +sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None) +btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None) diff --git a/draft_code/keyboard_test_diy_again.py b/draft_code/keyboard_test_diy_again.py new file mode 100644 index 00000000..de50f651 --- /dev/null +++ b/draft_code/keyboard_test_diy_again.py @@ -0,0 +1,121 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import sdl_pointer +import sdl_keyboard + +# Initialize display +TFT_HOR_RES = 320 +TFT_VER_RES = 240 +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +# Create group +group = lv.group_create() +group.set_default() +keyboard.set_group(group) + +# Simulated key input for buttons +simulated_key = None +simulated_state = None +def get_key(indev, data): + global simulated_key, simulated_state + if simulated_key is not None: + print(f"Simulating key: state={simulated_state}, key={simulated_key}") + data.state = simulated_state + data.key = simulated_key + simulated_key = None # Clear after processing + else: + data.state = 0 # No key event by default + data.key = 0 + +# Create custom input device for simulated keys +sim_indev = lv.indev_create() +sim_indev.set_type(lv.INDEV_TYPE.KEYPAD) +sim_indev.set_read_cb(get_key) +sim_indev.set_group(group) + +# Create widgets +screen = lv.screen_active() + +# Textarea +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.set_placeholder_text("Type here") +ta.align(lv.ALIGN.TOP_LEFT, 10, 10) +group.add_obj(ta) + +# Switch +sw = lv.switch(screen) +sw.align(lv.ALIGN.TOP_LEFT, 10, 50) +group.add_obj(sw) + +# Test Button +btn = lv.button(screen) +btn.align(lv.ALIGN.TOP_LEFT, 10, 90) +lbl = lv.label(btn) +lbl.set_text("Test Button") +group.add_obj(btn) + +# Simulate NEXT key button +btn_next = lv.button(screen) +btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10) +lbl_next = lv.label(btn_next) +lbl_next.set_text("PREV") +def btn_next_cb(event): + global simulated_key, simulated_state + if event.get_code() == lv.EVENT.CLICKED: + simulated_key = lv.KEY.PREV + simulated_state = 1 + sim_indev.read() # Trigger read immediately + simulated_state = 0 + sim_indev.read() +btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None) + +# Simulate ENTER key button +btn_enter = lv.button(screen) +btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10) +lbl_enter = lv.label(btn_enter) +lbl_enter.set_text("ENTER") +def btn_enter_cb(event): + global simulated_key, simulated_state + if event.get_code() == lv.EVENT.CLICKED: + simulated_key = lv.KEY.ENTER + simulated_state = 1 + sim_indev.read() + simulated_state = 0 + sim_indev.read() +btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None) + +# Debug focus +def check_focus(): + focused = lv.group_get_focused(group) + print(f"Focused widget: {focused}") +th = task_handler.TaskHandler(duration=5) +th.register_task(check_focus, 1000) + +# Debug events +def event_cb(event, name): + event_code = event.get_code() + key = event.get_key() if event_code == lv.EVENT.KEY else None + print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}, key={key}") +ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None) +sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None) +btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None) diff --git a/draft_code/keyboard_test_diy_closer.py b/draft_code/keyboard_test_diy_closer.py new file mode 100644 index 00000000..67eaff70 --- /dev/null +++ b/draft_code/keyboard_test_diy_closer.py @@ -0,0 +1,114 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import sdl_pointer +import sdl_keyboard + +# Initialize display +TFT_HOR_RES = 320 +TFT_VER_RES = 240 +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +pressed = False +def get_key(indev,data): + print("simulating get_key") + global pressed + if not pressed: + # input your keypad code + data.state = 1 #1 for press 0 for released + data.key=100 + #pressed = True + else: + data.state = 0 #1 for press 0 for released + data.key=100 + pressed = False + + +# Create group +#group = lv.group_create() +#group.set_default() +group = keyboard.get_group() + +#keyboard=lv.indev_create() +#keyboard.set_type(lv.INDEV_TYPE.KEYPAD) +#keyboard.set_read_cb(get_key) +#keyboard.set_group(group) + +#keyboard.set_group(group) + +# Create widgets +screen = lv.screen_active() + +# Textarea +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.set_placeholder_text("Type here") +ta.align(lv.ALIGN.TOP_LEFT, 10, 10) +group.add_obj(ta) + +# Switch +sw = lv.switch(screen) +sw.align(lv.ALIGN.TOP_LEFT, 10, 50) +group.add_obj(sw) + +# Test Button +btn = lv.button(screen) +btn.align(lv.ALIGN.TOP_LEFT, 10, 90) +lbl = lv.label(btn) +lbl.set_text("Test Button") +group.add_obj(btn) + +# Simulate NEXT key button +btn_next = lv.button(screen) +btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10) +lbl_next = lv.label(btn_next) +lbl_next.set_text("NEXT") +def btn_next_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press + keyboard._keypad_cb(None, 0, 9, 0) # Simulate release +btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None) + +# Simulate ENTER key button +btn_enter = lv.button(screen) +btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10) +lbl_enter = lv.label(btn_enter) +lbl_enter.set_text("ENTER") +def btn_enter_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press + keyboard._keypad_cb(None, 0, 13, 0) # Simulate release +btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None) + +# Debug focus +def check_focus(): + focused = lv.group_get_focused(group) + print(f"Focused widget: {focused}") +th = task_handler.TaskHandler(duration=5) +th.register_task(check_focus, 1000) + +# Debug events +def event_cb(event, name): + event_code = event.get_code() + print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}") +ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None) +sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None) +btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None) diff --git a/draft_code/keyboard_test_diy_closer_works.py b/draft_code/keyboard_test_diy_closer_works.py new file mode 100644 index 00000000..67eaff70 --- /dev/null +++ b/draft_code/keyboard_test_diy_closer_works.py @@ -0,0 +1,114 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import sdl_pointer +import sdl_keyboard + +# Initialize display +TFT_HOR_RES = 320 +TFT_VER_RES = 240 +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +pressed = False +def get_key(indev,data): + print("simulating get_key") + global pressed + if not pressed: + # input your keypad code + data.state = 1 #1 for press 0 for released + data.key=100 + #pressed = True + else: + data.state = 0 #1 for press 0 for released + data.key=100 + pressed = False + + +# Create group +#group = lv.group_create() +#group.set_default() +group = keyboard.get_group() + +#keyboard=lv.indev_create() +#keyboard.set_type(lv.INDEV_TYPE.KEYPAD) +#keyboard.set_read_cb(get_key) +#keyboard.set_group(group) + +#keyboard.set_group(group) + +# Create widgets +screen = lv.screen_active() + +# Textarea +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.set_placeholder_text("Type here") +ta.align(lv.ALIGN.TOP_LEFT, 10, 10) +group.add_obj(ta) + +# Switch +sw = lv.switch(screen) +sw.align(lv.ALIGN.TOP_LEFT, 10, 50) +group.add_obj(sw) + +# Test Button +btn = lv.button(screen) +btn.align(lv.ALIGN.TOP_LEFT, 10, 90) +lbl = lv.label(btn) +lbl.set_text("Test Button") +group.add_obj(btn) + +# Simulate NEXT key button +btn_next = lv.button(screen) +btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10) +lbl_next = lv.label(btn_next) +lbl_next.set_text("NEXT") +def btn_next_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press + keyboard._keypad_cb(None, 0, 9, 0) # Simulate release +btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None) + +# Simulate ENTER key button +btn_enter = lv.button(screen) +btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10) +lbl_enter = lv.label(btn_enter) +lbl_enter.set_text("ENTER") +def btn_enter_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press + keyboard._keypad_cb(None, 0, 13, 0) # Simulate release +btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None) + +# Debug focus +def check_focus(): + focused = lv.group_get_focused(group) + print(f"Focused widget: {focused}") +th = task_handler.TaskHandler(duration=5) +th.register_task(check_focus, 1000) + +# Debug events +def event_cb(event, name): + event_code = event.get_code() + print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}") +ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None) +sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None) +btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None) diff --git a/draft_code/keyboard_test_onscreen.py b/draft_code/keyboard_test_onscreen.py new file mode 100644 index 00000000..cc436ec0 --- /dev/null +++ b/draft_code/keyboard_test_onscreen.py @@ -0,0 +1,103 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import sdl_pointer +import sdl_keyboard + +# Initialize display +TFT_HOR_RES = 320 +TFT_VER_RES = 240 +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +# Create group +group = lv.group_create() +group.set_default() # Set as default group +keyboard.set_group(group) + +# Create widgets +screen = lv.screen_active() + +# Textarea +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.set_placeholder_text("Type here") +ta.align(lv.ALIGN.TOP_LEFT, 10, 10) +group.add_obj(ta) + +# Switch +sw = lv.switch(screen) +sw.align(lv.ALIGN.TOP_LEFT, 10, 50) +group.add_obj(sw) + +# Button +btn = lv.button(screen) +btn.align(lv.ALIGN.TOP_LEFT, 10, 90) +lbl = lv.label(btn) +lbl.set_text("Test Button") +group.add_obj(btn) + +# Simulate NEXT key button +btn_next = lv.button(screen) +btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10) +lbl_next = lv.label(btn_next) +lbl_next.set_text("NEXT") +def btn_next_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard.send_event(lv.EVENT.KEY, lv.KEY.NEXT) +btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None) + +# Simulate ENTER key button +btn_enter = lv.button(screen) +btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10) +lbl_enter = lv.label(btn_enter) +lbl_enter.set_text("ENTER") +def btn_enter_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard.send_event(lv.EVENT.KEY, lv.KEY.ENTER) +btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None) + +# Debug focus +def check_focus(): + focused = lv.group_get_focused(group) + print(f"Focused widget: {focused}") +th = task_handler.TaskHandler(duration=5) +th.register_task(check_focus, 1000) # Check focus every 1s + +# Debug textarea events +def ta_event_cb(event): + event_code = event.get_code() + name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN') + print(f"Textarea event: code={event_code}, name={name}") +ta.add_event_cb(ta_event_cb, lv.EVENT.ALL, None) + +# Debug switch events +def sw_event_cb(event): + event_code = event.get_code() + name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN') + print(f"Switch event: code={event_code}, name={name}") +sw.add_event_cb(sw_event_cb, lv.EVENT.ALL, None) + +# Debug button events +def btn_event_cb(event): + event_code = event.get_code() + name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN') + print(f"Button event: code={event_code}, name={name}") +btn.add_event_cb(btn_event_cb, lv.EVENT.ALL, None) diff --git a/draft_code/keyboard_test_onscreen_again.py b/draft_code/keyboard_test_onscreen_again.py new file mode 100644 index 00000000..65e294ed --- /dev/null +++ b/draft_code/keyboard_test_onscreen_again.py @@ -0,0 +1,92 @@ +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler +import sys +sys.path.append('lib/') +import sdl_pointer +import sdl_keyboard + +# Initialize display +TFT_HOR_RES = 320 +TFT_VER_RES = 240 +bus = lcd_bus.SDLBus(flags=0) +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) +display = sdl_display.SDLDisplay( + data_bus=bus, + display_width=TFT_HOR_RES, + display_height=TFT_VER_RES, + frame_buffer1=buf1, + color_space=lv.COLOR_FORMAT.RGB565 +) +display.init() + +# Initialize mouse +mouse = sdl_pointer.SDLPointer() + +# Initialize keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +# Create group +group = lv.group_create() +group.set_default() +keyboard.set_group(group) + +# Create widgets +screen = lv.screen_active() + +# Textarea +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.set_placeholder_text("Type here") +ta.align(lv.ALIGN.TOP_LEFT, 10, 10) +group.add_obj(ta) + +# Switch +sw = lv.switch(screen) +sw.align(lv.ALIGN.TOP_LEFT, 10, 50) +group.add_obj(sw) + +# Test Button +btn = lv.button(screen) +btn.align(lv.ALIGN.TOP_LEFT, 10, 90) +lbl = lv.label(btn) +lbl.set_text("Test Button") +group.add_obj(btn) + +# Simulate NEXT key button +btn_next = lv.button(screen) +btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10) +lbl_next = lv.label(btn_next) +lbl_next.set_text("NEXT") +def btn_next_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press + keyboard._keypad_cb(None, 0, 9, 0) # Simulate release +btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None) + +# Simulate ENTER key button +btn_enter = lv.button(screen) +btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10) +lbl_enter = lv.label(btn_enter) +lbl_enter.set_text("ENTER") +def btn_enter_cb(event): + if event.get_code() == lv.EVENT.CLICKED: + keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press + keyboard._keypad_cb(None, 0, 13, 0) # Simulate release +btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None) + +# Debug focus +def check_focus(): + focused = lv.group_get_focused(group) + print(f"Focused widget: {focused}") +th = task_handler.TaskHandler(duration=5) +th.register_task(check_focus, 1000) + +# Debug events +def event_cb(event, name): + event_code = event.get_code() + print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}") +ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None) +sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None) +btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None) diff --git a/draft_code/keyboard_test_works_simplify.py b/draft_code/keyboard_test_works_simplify.py new file mode 100644 index 00000000..90f48a38 --- /dev/null +++ b/draft_code/keyboard_test_works_simplify.py @@ -0,0 +1,103 @@ +# Hardware initialization for Unix and MacOS systems + +import lcd_bus +import lvgl as lv +import sdl_display +import task_handler + + +# Add lib/ to the path for modules, otherwise it will only search in ~/.micropython/lib and /usr/lib/micropython +import sys +sys.path.append('lib/') + + +import mpos.ui + + +#TFT_HOR_RES=640 +#TFT_VER_RES=480 +TFT_HOR_RES=320 +TFT_VER_RES=240 + +def window_cb(args): # doesn't get called + print(f"Window callback: {args}") + +bus = lcd_bus.SDLBus(flags=0) +bus.register_window_callback(window_cb) + +# bus.set_window_size(320,240,-1,False) # -1 might be 25 but it always becomes black, except for format 0 + +buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0) + +display = sdl_display.SDLDisplay(data_bus=bus,display_width=TFT_HOR_RES,display_height=TFT_VER_RES,frame_buffer1=buf1,color_space=lv.COLOR_FORMAT.RGB565) +display.init() + +import sdl_pointer +mouse = sdl_pointer.SDLPointer() + +import sdl_keyboard +keyboard = sdl_keyboard.SDLKeyboard() + +# seems indev isn't properly initialized +def keypad_cb(indev, indev_data): + #print(f"keypad_cb {indev} {indev_data}") + #key = indev.get_key() # always 0 + #print(f"key {key}") + #key = indev_data.get("key") + #print(f"key {key}") + pressed, code = keyboard._get_key() + print(f"periodic pressed: {pressed}, code: {code}") + # I mean we could read the key and put it in the textarea but I want some kind of keypress :-/ + +def keyboard_cb(event): + event_code=event.get_code() + print(f"keyboard_test YES: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()} + + +def button_cb(event): + event_code=event.get_code() + name = mpos.ui.get_event_name(event_code) + print(f"button_cb YES: code={event_code} and name {name}") + +# for some reason, this text areas is receiving mouse events, and draw events, but not key events... +def ta_callback_again(event): + event_code=event.get_code() + if event_code in [19,23,25,26,27,28,29,30,49]: + return + name = mpos.ui.get_event_name(event_code) + print(f"ta_callback_again {event_code} and {name}") + #print(f"ta_callback_again: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()} + +keyboard.add_event_cb(keyboard_cb, lv.EVENT.ALL, None) + + +#group = lv.group_create() +#group = keyboard.get_group() + +th = task_handler.TaskHandler(duration=5) # 5ms is recommended for MicroPython+LVGL on desktop + +screen = lv.screen_active() + +b = lv.button(screen) +b.center() +b.add_event_cb(button_cb, lv.EVENT.ALL, None) +#group.add_obj(b) + +ta = lv.textarea(screen) +ta.set_one_line(True) +ta.align(lv.ALIGN.TOP_LEFT,0,0) +#group.add_obj(ta) + +keyboard = lv.keyboard(screen) +keyboard.set_textarea(ta) + + +# this does something, but just gives indev 0, being error... +#indev = lv.indev_create() +#indev.set_type(lv.INDEV_TYPE.KEYPAD) +#indev.set_read_cb(keypad_cb) + +ta.add_event_cb(ta_callback_again, lv.EVENT.ALL, None) + + +#keyboard.set_group(group) diff --git a/draft_code/main.c_with_chroot b/draft_code/main.c_with_chroot new file mode 100644 index 00000000..426b7078 --- /dev/null +++ b/draft_code/main.c_with_chroot @@ -0,0 +1,81 @@ +diff --git a/ports/unix/main.c b/ports/unix/main.c +index 58fa3ff..c022c20 100644 +--- a/ports/unix/main.c ++++ b/ports/unix/main.c +@@ -53,6 +53,8 @@ + #include "extmod/vfs_posix.h" + #include "genhdr/mpversion.h" + #include "input.h" ++#include "machine_sdl.h" ++#include "machine_timer.h" + + // Command line options, with their defaults + static bool compile_only = false; +@@ -61,7 +63,7 @@ static uint emit_opt = MP_EMIT_OPT_NONE; + #if MICROPY_ENABLE_GC + // Heap size of GC heap (if enabled) + // Make it larger on a 64 bit machine, because pointers are larger. +-long heap_size = 1024 * 1024 * (sizeof(mp_uint_t) / 4); ++long heap_size = 8388608; + #endif + + // Number of heaps to assign by default if MICROPY_GC_SPLIT_HEAP=1 +@@ -192,6 +194,12 @@ static char *strjoin(const char *s1, int sep_char, const char *s2) { + } + #endif + ++char *mp_repl_get_ps3(void) ++{ ++ return ""; ++} ++ ++ + static int do_repl(void) { + mp_hal_stdout_tx_str(MICROPY_BANNER_NAME_AND_VERSION); + mp_hal_stdout_tx_str("; " MICROPY_BANNER_MACHINE); +@@ -282,8 +290,16 @@ static int do_repl(void) { + for (;;) { + char *line = prompt((char *)mp_repl_get_ps1()); + if (line == NULL) { +- // EOF +- return 0; ++ if (errno != EWOULDBLOCK) { ++ return 0; ++ } else { ++ while (line == NULL && errno == EWOULDBLOCK) { ++ mp_handle_pending(true); ++ usleep(1000); ++ line = prompt(mp_repl_get_ps3()); ++ } ++ if (line == NULL) return 0; ++ } + } + while (mp_repl_continue_with_input(line)) { + char *line2 = prompt((char *)mp_repl_get_ps2()); +@@ -470,6 +486,17 @@ static void sys_set_excecutable(char *argv0) { + MP_NOINLINE int main_(int argc, char **argv); + + int main(int argc, char **argv) { ++ // Parse command-line arguments for a custom root ++ const char *fs_root = "/home/user/sources/PiggyOS/internal_filesystem"; // Hardcode or parse from argv ++ if (chroot(fs_root) != 0) { ++ perror("chroot failed"); ++ return 1; ++ } ++ if (chdir("/") != 0) { ++ perror("chdir failed"); ++ return 1; ++ } ++ + #if MICROPY_PY_THREAD + mp_thread_init(); + #endif +@@ -752,6 +779,8 @@ MP_NOINLINE int main_(int argc, char **argv) { + MP_STATE_THREAD(prof_trace_callback) = MP_OBJ_NULL; + #endif + ++ machine_timer_deinit_all(); ++ deinit_sdl(); + #if MICROPY_PY_SYS_ATEXIT + // Beware, the sys.settrace callback should be disabled before running sys.atexit. + if (mp_obj_is_callable(MP_STATE_VM(sys_exitfunc))) { diff --git a/draft_code/my_websocket_test.py b/draft_code/my_websocket_test.py new file mode 100644 index 00000000..73e2b4df --- /dev/null +++ b/draft_code/my_websocket_test.py @@ -0,0 +1,49 @@ +# it's not super fast but it works! + +import websocket +import _thread +import time + +def on_message(wsapp, message): + print(f"got message: {message}") + +def on_ping(wsapp, message): + print("Got a ping! A pong reply has already been automatically sent.") + +def on_pong(wsapp, message): + print("Got a pong! No need to respond") + + +def on_error(wsapp, message): + print(f"Got error: {message}") + + +#wsapp = websocket.WebSocketApp("wss://testnet.binance.vision/ws/btcusdt@trade", on_message=on_message, on_ping=on_ping, on_pong=on_pong, on_error=on_error) + +wsapp = websocket.WebSocketApp("wss://echo.websocket.events", on_message=on_message, on_ping=on_ping, on_pong=on_pong, on_error=on_error) + +def stress_test_thread(): + print("before run_forever") + wsapp.run_forever(ping_interval=15, ping_timeout=10, ping_payload="This is an optional ping payload") + print("after run_forever") + +_thread.stack_size(16*1024) +_thread.start_new_thread(stress_test_thread, ()) + +time.sleep(5) +print("sending ok") +wsapp.send_text('ok') + + +time.sleep(15) +print("sending again") +wsapp.send_text('again') + + +time.sleep(25) +print("sending more") +wsapp.send_text('more') + +wsapp.close() + + diff --git a/draft_code/nostr_receive.py b/draft_code/nostr_receive.py new file mode 100644 index 00000000..16de48cc --- /dev/null +++ b/draft_code/nostr_receive.py @@ -0,0 +1,110 @@ +import json +import ssl +import time +import _thread + +from nostr.filter import Filter, Filters +from nostr.event import Event, EventKind +from nostr.relay_manager import RelayManager +from nostr.message_type import ClientMessageType + +#filters = Filters([Filter(authors=[], kinds=[EventKind.TEXT_NOTE])]) +#filters = Filters([Filter(authors=["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"], kinds=[EventKind.TEXT_NOTE])]) +#timestamp = round(time.time()-50) +#timestamp = round(time.time()) # going for zero events to check memory use + +timetogoback = 1000 +timetogoback = 2419200 # 28 days +timetogoback = 7776000 # 3 months + +# event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1745510390 with content "kind":1, Happy news! LightningPiggy is heading to +# contacts: event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1746307620 with content and kind 3 and tags [['p', '181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda'], ['p', 'ffb3f96661bda0295389cfc8c8fe65332e98cc24b12bce5ca40e09af3bb0d7bf'], +# reaction: event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1746307202 with content + and kind 7 and tags [['e', 'cee2aadc637f56e6f922b95898eecb2bd9b0f0f0ab2e2df6bb6d1e7830c697b0'], ['p', '3e6e0735b8a2e96f8cf663f64d04bb9cea931afcc57f5427c35bb6859e95c8a2']] +# event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1742146581 with content The kWh (kilowatt-hour) was pro... nd kind 1 and tags [['p', '92cbe5861cfc5213dd89f0a6f6084486f85e6f03cfeb70a13f455938116433b8', 'wss://nostrelites.org', 'mention']] + +# somehow, adding kinds breaks it?! +# ["REQ", "test1747750250", {"since": 1744011312, "kinds": [1], "authors": ["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"]}] +# => this gives me EOSE quickly +# ["REQ", "test1747750060", {"since": 1744011312, "authors": ["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"]}] +# this worked before, I think: +# ["REQ","index",{"kinds":[9735], "#p": ["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"]}] + +import sys +if sys.platform == "esp32": + # on esp32, it needs this correction: + timestamp = time.time() + 946684800 - timetogoback +else: + timestamp = round(time.time()-timetogoback) + #timestamp = round(time.time()-1000) + #timestamp = round(time.time()-5000) + +timestamp = 1744011312 + +filters = Filters([Filter(authors=["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"], since=timestamp)]) +#filters = Filters([Filter(authors=["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"], since=timestamp, kinds=[EventKind.TEXT_NOTE] )]) +#filters = Filters([Filter(authors=["04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"], kinds=[9735], since=timestamp)]) +#filters = Filters([Filter(kinds=[9735], since=timestamp)]) + +subscription_id = "test" + str(round(time.time())) +request = [ClientMessageType.REQUEST, subscription_id] +json.dumps(request) +request.extend(filters.to_json_array()) +message = json.dumps(request) +# ["REQ", "ihopethisworks3", {"kinds": [1], "authors": "04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"}] +print(f"sending this: {message}") + +def printevents(): + import micropython + print(f"at the start, thread stack used: {micropython.stack_use()}") + print("relaymanager") + relay_manager = RelayManager() + #relay_manager.add_relay("wss://nostr-pub.wellorder.net") + print("relaymanager adding") + relay_manager.add_relay("wss://relay.primal.net") + #relay_manager.add_relay("wss://relay.damus.io") + print("relaymanager subscribing") + relay_manager.add_subscription(subscription_id, filters) + print("opening connections") # after this, CPU usage goes high and stays there + relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE}) # NOTE: This disables ssl certificate verification + time.sleep(2) # allow the connections to open + print("publishing:") + relay_manager.publish_message(message) + time.sleep(2) # allow the messages to send + print("printing events:") + #while relay_manager.message_pool.has_events(): + # allowing 30 seconds for stuff to come in... + for _ in range(600): + time.sleep(1) + print("checking pool....") + try: + event_msg = relay_manager.message_pool.get_event() + print(f"event_msg: pubkey: {event_msg.event.public_key} created_at {event_msg.event.created_at} with content '{event_msg.event.content}' and kind {event_msg.event.kind} and tags {event_msg.event.tags}") + except Exception as e: + #print(f"pool.get_event() got error: {e}") + pass + print("30 seconds passed, closing:") + relay_manager.close_connections() + +# new thread so REPL stays available +# 12KB crashes here: +# opening connections +# [DEBUG 408724546] Starting run_forever +# [DEBUG 408724546] Starting _async_main +# [DEBUG 408724546] Reconnect interval set to 0s +# [DEBUG 408724546] Started callback processing task +# [DEBUG 408724546] Main loop iteration: self.running=True +# [DEBUG 408724546] Connecting to wss://relay.damus.io +# [DEBUG 408724547] Using SSL with no certificate verification +# 24KB is fine +# somehow, if I run this in a thread, I get: can't create thread" at File "/lib/nostr/relay_manager.py", line 48, in open_connections +# tried stack sizes from 18KB up to 32KB +#_thread.stack_size(16*1024) +#_thread.start_new_thread(printevents, ()) +printevents() + + +#import gc +#for _ in range(50): +# collect = gc.collect() +# print(f"MEMFREE: {gc.mem_free()}") +# time.sleep(1) diff --git a/draft_code/nwc-demo.py b/draft_code/nwc-demo.py new file mode 100644 index 00000000..4b27326f --- /dev/null +++ b/draft_code/nwc-demo.py @@ -0,0 +1,214 @@ +import json +import ssl +import time +import sys +from nostr.relay_manager import RelayManager +from nostr.message_type import ClientMessageType +from nostr.filter import Filter, Filters +from nostr.event import EncryptedDirectMessage +from nostr.key import PrivateKey + +def parse_nwc_url(nwc_url): + """Parse Nostr Wallet Connect URL to extract pubkey, relay, secret, and lud16.""" + print(f"DEBUG: Starting to parse NWC URL: {nwc_url}") + try: + # Remove 'nostr+walletconnect://' or 'nwc:' prefix + if nwc_url.startswith('nostr+walletconnect://'): + print(f"DEBUG: Removing 'nostr+walletconnect://' prefix") + nwc_url = nwc_url[22:] + elif nwc_url.startswith('nwc:'): + print(f"DEBUG: Removing 'nwc:' prefix") + nwc_url = nwc_url[4:] + else: + print(f"DEBUG: No recognized prefix found in URL") + raise ValueError("Invalid NWC URL: missing 'nostr+walletconnect://' or 'nwc:' prefix") + + print(f"DEBUG: URL after prefix removal: {nwc_url}") + + # Split into pubkey and query params + parts = nwc_url.split('?') + pubkey = parts[0] + print(f"DEBUG: Extracted pubkey: {pubkey}") + + # Validate pubkey (should be 64 hex characters) + if len(pubkey) != 64 or not all(c in '0123456789abcdef' for c in pubkey): + raise ValueError("Invalid NWC URL: pubkey must be 64 hex characters") + + # Extract relay, secret, and lud16 from query params + relay = None + secret = None + lud16 = None + if len(parts) > 1: + print(f"DEBUG: Query parameters found: {parts[1]}") + params = parts[1].split('&') + for param in params: + if param.startswith('relay='): + relay = param[6:] + print(f"DEBUG: Extracted relay: {relay}") + elif param.startswith('secret='): + secret = param[7:] + print(f"DEBUG: Extracted secret: {secret}") + elif param.startswith('lud16='): + lud16 = param[6:] + print(f"DEBUG: Extracted lud16: {lud16}") + else: + print(f"DEBUG: No query parameters found") + + if not pubkey or not relay or not secret: + raise ValueError("Invalid NWC URL: missing required fields (pubkey, relay, or secret)") + + # Validate secret (should be 64 hex characters) + if len(secret) != 64 or not all(c in '0123456789abcdef' for c in secret): + raise ValueError("Invalid NWC URL: secret must be 64 hex characters") + + return { + 'relay': relay, + 'pubkey': pubkey, + 'secret': secret, + 'lud16': lud16 + } + except Exception as e: + print(f"DEBUG: Error parsing NWC URL: {e}") + sys.exit(1) + +def get_balance(nwc_url): + """Get balance using Nostr Wallet Connect.""" + print(f"DEBUG: Starting get_balance with NWC URL: {nwc_url}") + # Parse NWC URL + nwc_data = parse_nwc_url(nwc_url) + relay = nwc_data['relay'] + wallet_pubkey = nwc_data['pubkey'] + secret = nwc_data['secret'] + lud16 = nwc_data['lud16'] + print(f"DEBUG: Parsed NWC data - Relay: {relay}, Pubkey: {wallet_pubkey}, Secret: {secret}, lud16: {lud16}") + + # Initialize private key from secret (assuming it's a hex key) + try: + #print(f"DEBUG: Initializing private key from secret") + private_key = PrivateKey(bytes.fromhex(secret)) + print(f"DEBUG: Private key initialized, public key: {private_key.public_key.hex()}") + except Exception as e: + print(f"DEBUG: Error initializing private key: {e}") + sys.exit(1) + + # Create get_balance request + balance_request = { + "method": "get_balance", + "params": {} + } + print(f"DEBUG: Created balance request: {balance_request}") + + #balance_request_string = json.dumps(balance_request) + #encrypted = private_key.encrypt_message(balance_request_string, wallet_pubkey) + #print(f"\n\n\nencryption returned: {encrypted}") + #decrypted = private_key.decrypt_message(encrypted, wallet_pubkey) + #print(f"\n\n\ndecryption returned: {decrypted}\n\n\n") + + #decrypted = private_key.decrypt_message(encrypted, wallet_pubkey) + #print(f"\n\n\ndecryption from nak returned: {decrypted}\n\n\n") + # padding error + + + # Create encrypted DM with the balance request + print(f"DEBUG: Creating encrypted DM to wallet pubkey: {wallet_pubkey}") + dm = EncryptedDirectMessage( + recipient_pubkey=wallet_pubkey, + cleartext_content=json.dumps(balance_request) + ) + + + print(f"DEBUG: Signing DM {json.dumps(dm)} with private key") + private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm + print(f"DEBUG: DM created with ID: {dm.id}") + + # Set up relay manager + print(f"DEBUG: Setting up relay manager with relay: {relay}") + relay_manager = RelayManager() + relay_manager.add_relay(relay) + print(f"DEBUG: Opening relay connections") + relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE}) + time.sleep(2) # Allow connections to open + + # Check for relay connection notices + print(f"DEBUG: Checking for relay notices") + while relay_manager.message_pool.has_notices(): + notice = relay_manager.message_pool.get_notice() + print(f"DEBUG: Relay notice: {notice.content}") + + # Set up subscription to receive response + subscription_id = "nwc_balance_" + str(round(time.time())) + #print(f"DEBUG: Setting up subscription with ID: {subscription_id}") + filters = Filters([Filter( + kinds=[23195], # NWC replies + authors=[wallet_pubkey], + pubkey_refs=[private_key.public_key.hex()] + )]) + #print(f"DEBUG: Subscription filters: {filters.to_json_array()}") + relay_manager.add_subscription(subscription_id, filters) + + # Publish request + print(f"DEBUG: Publishing subscription request") + request_message = [ClientMessageType.REQUEST, subscription_id] + request_message.extend(filters.to_json_array()) + relay_manager.publish_message(json.dumps(request_message)) + print(f"DEBUG: Publishing encrypted DM") + relay_manager.publish_event(dm) + # only accept events after the time it was published + after_time = time.time() + if sys.platform == "esp32": + # on esp32, it needs this correction: + after_time += 946684800 + after_time -= 60 # go back a bit because server clocks might be drifting + print(f"will only consider events after {after_time}") + + + # Wait for response + print(f"DEBUG: Waiting for response...") + print(f"starting at {time.localtime()}") + start_time = time.time() + balance = None + while time.time() - start_time < 60 * 2: + while relay_manager.message_pool.has_events(): + print(f"DEBUG: Event received from message pool") + event_msg = relay_manager.message_pool.get_event() + event_created_at = event_msg.event.created_at + print(f"Received at {time.localtime()} a message with timestamp {event_created_at}") + if event_created_at < after_time: + print("Skipping event because it's too old!") + continue + #print(f"event_msg content {event_msg.event.content}") + try: + #print(f"DEBUG: Decrypting event from public_key: {event_msg.event.public_key}") + decrypted_content = private_key.decrypt_message( + event_msg.event.content, + event_msg.event.public_key + ) + print(f"DEBUG: Decrypted content: {decrypted_content}") + response = json.loads(decrypted_content) + print(f"DEBUG: Parsed response: {response}") + if response.get("method") == "get_balance": + balance = response.get("result", {}).get("balance") + print(f"DEBUG: Balance found: {balance} satoshis") + break + except Exception as e: + print(f"DEBUG: Error processing response: {e}") + if balance is not None: + break + time.sleep(1) + + print(f"finished at {time.localtime()}") + + # Close connections + print(f"DEBUG: Closing relay connections") + relay_manager.close_connections() + + if balance is not None: + print(f"Balance: {balance} satoshis") + else: + print("No balance response received or request timed out") + +# Example usage +#nwc_url = "nostr+walletconnect://b889ff5b1513b641e2a139f661a661364979c5beee91842f8f0ef42ab558e9d4?relay=wss%3A%2F%2Frelay.getalby.com/v1&secret=71a8c14c1407c113601079c4302dab36460f0ccd0ad506f1f2dc73b5100e4f3c&lud16=moritz@getalby.com" +#nwc_url = "nostr+walletconnect://b889ff5b1513b641e2a139f661a661364979c5beee91842f8f0ef42ab558e9d4?relay=wss://relay.getalby.com/v1&secret=71a8c14c1407c113601079c4302dab36460f0ccd0ad506f1f2dc73b5100e4f3c&lud16=moritz@getalby.com" +print(f"Processing NWC URL: {nwc_url}") +get_balance(nwc_url) diff --git a/draft_code/video_player/player.py b/draft_code/video_player/player.py new file mode 100644 index 00000000..e920fded --- /dev/null +++ b/draft_code/video_player/player.py @@ -0,0 +1,5 @@ +p = lv.ffmpeg_player(lv.screen_active()) +p.player_set_src("../artwork/Big_Buck_Bunny_extract.ogv") +p.player_set_auto_restart(True) +p.player_set_cmd(p.PLAYER_CMD.START) +p.center()