You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Add draft_code
This commit is contained in:
BIN
Binary file not shown.
BIN
Binary file not shown.
@@ -0,0 +1,449 @@
|
||||
import lvgl as lv
|
||||
|
||||
import uio
|
||||
import ujson
|
||||
import uos
|
||||
|
||||
import _thread
|
||||
import traceback
|
||||
|
||||
import mpos.info
|
||||
import mpos.ui
|
||||
|
||||
def good_stack_size():
|
||||
stacksize = 24*1024
|
||||
import sys
|
||||
if sys.platform == "esp32":
|
||||
stacksize = 16*1024
|
||||
return stacksize
|
||||
|
||||
# Run the script in the current thread:
|
||||
def execute_script(script_source, is_file, cwd=None, classname=None):
|
||||
thread_id = _thread.get_ident()
|
||||
compile_name = 'script' if not is_file else script_source
|
||||
print(f"Thread {thread_id}: executing script with cwd: {cwd}")
|
||||
|
||||
script_globals = {'lv': lv, '__name__': "__main__"}
|
||||
import sys
|
||||
import uos
|
||||
import utime
|
||||
|
||||
path_before = sys.path
|
||||
if cwd:
|
||||
sys.path.append(cwd)
|
||||
|
||||
try:
|
||||
if is_file:
|
||||
mpy_file = script_source.rsplit('.py', 1)[0] + '.mpy' if '.py' in script_source else script_source + '.mpy'
|
||||
try:
|
||||
uos.stat(mpy_file)
|
||||
source_file = mpy_file
|
||||
except OSError:
|
||||
source_file = script_source
|
||||
mode = 'rb' if source_file.endswith('.mpy') else 'r'
|
||||
print(f"Thread {thread_id}: reading {'bytecode' if mode == 'rb' else 'script'} from {source_file}")
|
||||
|
||||
start_time = utime.ticks_ms()
|
||||
f = open(source_file, mode)
|
||||
try:
|
||||
script_source = f.read()
|
||||
finally:
|
||||
f.close()
|
||||
read_time = utime.ticks_diff(utime.ticks_ms(), start_time)
|
||||
print(f"Thread {thread_id}: file read took {read_time} ms")
|
||||
|
||||
try:
|
||||
start_time = utime.ticks_ms()
|
||||
compiled_script = script_source if isinstance(script_source, bytes) else compile(script_source, compile_name, 'exec')
|
||||
compile_time = utime.ticks_diff(utime.ticks_ms(), start_time)
|
||||
if not isinstance(script_source, bytes):
|
||||
print(f"Thread {thread_id}: compilation took {compile_time} ms")
|
||||
|
||||
exec(compiled_script, script_globals)
|
||||
if classname:
|
||||
main_activity = script_globals.get(classname)
|
||||
if main_activity:
|
||||
Activity.startActivity(None, Intent(activity_class=main_activity))
|
||||
else:
|
||||
print("Warning: could not find main_activity")
|
||||
except Exception as e:
|
||||
print(f"Thread {thread_id}: exception during execution:")
|
||||
traceback.print_exception(type(e), e, getattr(e, '__traceback__', None))
|
||||
print(f"Thread {thread_id}: script {compile_name} finished")
|
||||
except Exception as e:
|
||||
print(f"Thread {thread_id}: error:")
|
||||
traceback.print_exception(type(e), e, getattr(e, '__traceback__', None))
|
||||
sys.path = path_before
|
||||
|
||||
# Run the script in a new thread:
|
||||
# TODO: check if the script exists here instead of launching a new thread?
|
||||
def execute_script_new_thread(scriptname, is_file):
|
||||
print(f"main.py: execute_script_new_thread({scriptname},{is_file})")
|
||||
try:
|
||||
# 168KB maximum at startup but 136KB after loading display, drivers, LVGL gui etc so let's go for 128KB for now, still a lot...
|
||||
# But then no additional threads can be created. A stacksize of 32KB allows for 4 threads, so 3 in the app itself, which might be tight.
|
||||
# 16KB allows for 10 threads in the apps, but seems too tight for urequests on unix (desktop) targets
|
||||
# 32KB seems better for the camera, but it forced me to lower other app threads from 16 to 12KB
|
||||
#_thread.stack_size(24576) # causes camera issue...
|
||||
# NOTE: This doesn't do anything if apps are started in the same thread!
|
||||
if "camtest" in scriptname:
|
||||
print("Starting camtest with extra stack size!")
|
||||
stack=32*1024
|
||||
elif "appstore"in scriptname:
|
||||
print("Starting appstore with extra stack size!")
|
||||
stack=24*1024 # this doesn't do anything because it's all started in the same thread
|
||||
else:
|
||||
stack=16*1024 # 16KB doesn't seem to be enough for the AppStore app on desktop
|
||||
stack = mpos.apps.good_stack_size()
|
||||
print(f"app.py: setting stack size for script to {stack}")
|
||||
_thread.stack_size(stack)
|
||||
_thread.start_new_thread(execute_script, (scriptname, is_file))
|
||||
except Exception as e:
|
||||
print("main.py: execute_script_new_thread(): error starting new thread thread: ", e)
|
||||
|
||||
def start_app_by_name(app_name, is_launcher=False):
|
||||
mpos.ui.set_foreground_app(app_name)
|
||||
custom_app_dir=f"apps/{app_name}"
|
||||
builtin_app_dir=f"builtin/apps/{app_name}"
|
||||
try:
|
||||
stat = uos.stat(custom_app_dir)
|
||||
start_app(custom_app_dir, is_launcher)
|
||||
except OSError:
|
||||
start_app(builtin_app_dir, is_launcher)
|
||||
|
||||
def start_app(app_dir, is_launcher=False):
|
||||
print(f"main.py start_app({app_dir},{is_launcher})")
|
||||
mpos.ui.set_foreground_app(app_dir) # would be better to store only the app name...
|
||||
manifest_path = f"{app_dir}/META-INF/MANIFEST.JSON"
|
||||
app = mpos.apps.parse_manifest(manifest_path)
|
||||
print(f"start_app parsed manifest and got: {str(app)}")
|
||||
main_launcher_activity = find_main_launcher_activity(app)
|
||||
if not main_launcher_activity:
|
||||
print(f"WARNING: can't start {app_dir} because no main_launcher_activity was found.")
|
||||
return
|
||||
start_script_fullpath = f"{app_dir}/{main_launcher_activity.get('entrypoint')}"
|
||||
execute_script(start_script_fullpath, True, app_dir + "/assets/", main_launcher_activity.get("classname"))
|
||||
# Launchers have the bar, other apps don't have it
|
||||
if is_launcher:
|
||||
mpos.ui.open_bar()
|
||||
else:
|
||||
mpos.ui.close_bar()
|
||||
|
||||
def restart_launcher():
|
||||
mpos.ui.empty_screen_stack()
|
||||
# No need to stop the other launcher first, because it exits after building the screen
|
||||
start_app_by_name("com.micropythonos.launcher", True) # Would be better to query the PackageManager for Activities that are launchers
|
||||
|
||||
def find_main_launcher_activity(app):
|
||||
result = None
|
||||
for activity in app.activities:
|
||||
if not activity.get("entrypoint") or not activity.get("classname"):
|
||||
print(f"Warning: activity {activity} has no entrypoint and classname, skipping...")
|
||||
continue
|
||||
print("checking activity's intent_filters...")
|
||||
for intent_filter in activity.get("intent_filters"):
|
||||
print("checking intent_filter...")
|
||||
if intent_filter.get("action") == "main" and intent_filter.get("category") == "launcher":
|
||||
print("found main_launcher!")
|
||||
result = activity
|
||||
break
|
||||
return result
|
||||
|
||||
def is_launcher(app_name):
|
||||
print(f"checking is_launcher for {app_name}")
|
||||
# Simple check, could be more elaborate by checking the MANIFEST.JSON for the app...
|
||||
return "launcher" in app_name
|
||||
|
||||
|
||||
class App:
|
||||
def __init__(self, name, publisher, short_description, long_description, icon_url, download_url, fullname, version, category, activities):
|
||||
self.name = name
|
||||
self.publisher = publisher
|
||||
self.short_description = short_description
|
||||
self.long_description = long_description
|
||||
self.icon_url = icon_url
|
||||
self.download_url = download_url
|
||||
self.fullname = fullname
|
||||
self.version = version
|
||||
self.category = category
|
||||
self.image = None
|
||||
self.image_dsc = None
|
||||
self.activities = activities
|
||||
|
||||
def __str__(self):
|
||||
return (f"App(name='{self.name}', "
|
||||
f"publisher='{self.publisher}', "
|
||||
f"short_description='{self.short_description}', "
|
||||
f"version='{self.version}', "
|
||||
f"category='{self.category}', "
|
||||
f"activities={self.activities})")
|
||||
|
||||
def parse_manifest(manifest_path):
|
||||
# Default values for App object
|
||||
default_app = App(
|
||||
name="Unknown",
|
||||
publisher="Unknown",
|
||||
short_description="",
|
||||
long_description="",
|
||||
icon_url="",
|
||||
download_url="",
|
||||
fullname="Unknown",
|
||||
version="0.0.0",
|
||||
category="",
|
||||
activities=[]
|
||||
)
|
||||
try:
|
||||
with open(manifest_path, 'r') as f:
|
||||
app_info = ujson.load(f)
|
||||
#print(f"parsed app: {app_info}")
|
||||
# Create App object with values from manifest, falling back to defaults
|
||||
return App(
|
||||
name=app_info.get("name", default_app.name),
|
||||
publisher=app_info.get("publisher", default_app.publisher),
|
||||
short_description=app_info.get("short_description", default_app.short_description),
|
||||
long_description=app_info.get("long_description", default_app.long_description),
|
||||
icon_url=app_info.get("icon_url", default_app.icon_url),
|
||||
download_url=app_info.get("download_url", default_app.download_url),
|
||||
fullname=app_info.get("fullname", default_app.fullname),
|
||||
version=app_info.get("version", default_app.version),
|
||||
category=app_info.get("category", default_app.category),
|
||||
activities=app_info.get("activities", default_app.activities)
|
||||
)
|
||||
except OSError:
|
||||
print(f"parse_manifest: error loading manifest_path: {manifest_path}")
|
||||
return default_app
|
||||
|
||||
|
||||
|
||||
def auto_connect():
|
||||
builtin_auto_connect = "builtin/system/WifiService.py"
|
||||
try:
|
||||
print(f"Starting {builtin_auto_connect}...")
|
||||
stat = uos.stat(builtin_auto_connect)
|
||||
execute_script_new_thread(builtin_auto_connect, True)
|
||||
except Exception as e:
|
||||
print("Couldn't execute {builtin_auto_connect} because exception {e}, continuing...")
|
||||
|
||||
|
||||
class Activity:
|
||||
|
||||
def __init__(self):
|
||||
self.intent = None # Store the intent that launched this activity
|
||||
self.result = None
|
||||
self._result_callback = None
|
||||
|
||||
def onCreate(self):
|
||||
pass
|
||||
def onStart(self, screen):
|
||||
pass
|
||||
def onResume(self, screen):
|
||||
pass
|
||||
def onPause(self, screen):
|
||||
pass
|
||||
def onStop(self, screen):
|
||||
pass
|
||||
def onDestroy(self, screen):
|
||||
pass
|
||||
|
||||
def setContentView(self, screen):
|
||||
mpos.ui.setContentView(self, screen)
|
||||
|
||||
def startActivity(self, intent):
|
||||
ActivityNavigator.startActivity(intent)
|
||||
|
||||
def startActivityForResult(self, intent, result_callback):
|
||||
ActivityNavigator.startActivityForResult(intent, result_callback)
|
||||
|
||||
def initError(self, e):
|
||||
print(f"WARNING: You might have inherited from Activity with a custom __init__() without calling super().__init__(). Got AttributeError: {e}")
|
||||
|
||||
def getIntent(self):
|
||||
try:
|
||||
return self.intent
|
||||
except AttributeError as e:
|
||||
self.initError(e)
|
||||
|
||||
def setResult(self, result_code, data=None):
|
||||
"""Set the result to be returned when the activity finishes."""
|
||||
try:
|
||||
self.result = {"result_code": result_code, "data": data or {}}
|
||||
except AttributeError as e:
|
||||
self.initError(e)
|
||||
|
||||
def finish(self):
|
||||
mpos.ui.back_screen()
|
||||
try:
|
||||
if self._result_callback and self.result:
|
||||
self._result_callback(self.result)
|
||||
self._result_callback = None # Clean up
|
||||
except AttributeError as e:
|
||||
self.initError(e)
|
||||
|
||||
class Intent:
|
||||
def __init__(self, activity_class=None, action=None, data=None, extras=None):
|
||||
self.activity_class = activity_class # Explicit target (e.g., SettingsActivity)
|
||||
self.action = action # Action string (e.g., "view", "share")
|
||||
self.data = data # Single data item (e.g., URL)
|
||||
self.extras = extras or {} # Dictionary for additional data
|
||||
self.flags = {} # Simplified flags: {"clear_top": bool, "no_history": bool, "no_animation": bool}
|
||||
|
||||
def addFlag(self, flag, value=True):
|
||||
self.flags[flag] = value
|
||||
return self
|
||||
|
||||
def putExtra(self, key, value):
|
||||
self.extras[key] = value
|
||||
return self
|
||||
|
||||
|
||||
class ActivityNavigator:
|
||||
@staticmethod
|
||||
def startActivity(intent):
|
||||
if not isinstance(intent, Intent):
|
||||
raise ValueError("Must provide an Intent")
|
||||
if intent.action: # Implicit intent: resolve handlers
|
||||
handlers = APP_REGISTRY.get(intent.action, [])
|
||||
if len(handlers) == 1:
|
||||
intent.activity_class = handlers[0]
|
||||
ActivityNavigator._launch_activity(intent)
|
||||
elif handlers:
|
||||
ActivityNavigator._show_chooser(intent, handlers)
|
||||
else:
|
||||
raise ValueError(f"No handlers for action: {intent.action}")
|
||||
else:
|
||||
ActivityNavigator._launch_activity(intent)
|
||||
|
||||
@staticmethod
|
||||
def startActivityForResult(intent, result_callback):
|
||||
"""Launch an activity and pass a callback for the result."""
|
||||
if not isinstance(intent, Intent):
|
||||
raise ValueError("Must provide an Intent")
|
||||
if intent.action: # Implicit intent: resolve handlers
|
||||
handlers = APP_REGISTRY.get(intent.action, [])
|
||||
if len(handlers) == 1:
|
||||
intent.activity_class = handlers[0]
|
||||
return ActivityNavigator._launch_activity(intent, result_callback)
|
||||
elif handlers:
|
||||
ActivityNavigator._show_chooser(intent, handlers)
|
||||
return None # Chooser handles result forwarding
|
||||
else:
|
||||
raise ValueError(f"No handlers for action: {intent.action}")
|
||||
else:
|
||||
return ActivityNavigator._launch_activity(intent, result_callback)
|
||||
|
||||
@staticmethod
|
||||
def _launch_activity(intent, result_callback=None):
|
||||
"""Launch an activity and set up result callback."""
|
||||
activity = intent.activity_class()
|
||||
activity.intent = intent
|
||||
activity._result_callback = result_callback # Pass callback to activity
|
||||
activity.onCreate()
|
||||
return activity
|
||||
|
||||
@staticmethod
|
||||
def _show_chooser(intent, handlers):
|
||||
chooser_intent = Intent(ChooserActivity, extras={"original_intent": intent, "handlers": [h.__name__ for h in handlers]})
|
||||
ActivityNavigator._launch_activity(chooser_intent)
|
||||
|
||||
|
||||
class ChooserActivity(Activity):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def onCreate(self):
|
||||
screen = lv.obj()
|
||||
# Get handlers from intent extras
|
||||
original_intent = self.getIntent().extras.get("original_intent")
|
||||
handlers = self.getIntent().extras.get("handlers", [])
|
||||
label = lv.label(screen)
|
||||
label.set_text("Choose an app")
|
||||
label.set_pos(10, 10)
|
||||
|
||||
for i, handler_name in enumerate(handlers):
|
||||
btn = lv.btn(screen)
|
||||
btn.set_user_data(f"handler_{i}")
|
||||
btn_label = lv.label(btn)
|
||||
btn_label.set_text(handler_name)
|
||||
btn.set_pos(10, 50 * (i + 1) + 10)
|
||||
btn.add_event_cb(lambda e, h=handler_name, oi=original_intent: self._select_handler(h, oi), lv.EVENT.CLICKED)
|
||||
self.setContentView(screen)
|
||||
|
||||
def _select_handler(self, handler_name, original_intent):
|
||||
for handler in APP_REGISTRY.get(original_intent.action, []):
|
||||
if handler.__name__ == handler_name:
|
||||
original_intent.activity_class = handler
|
||||
navigator.startActivity(original_intent)
|
||||
break
|
||||
navigator.finish() # Close chooser
|
||||
|
||||
def onStop(self, screen):
|
||||
if self.getIntent() and self.getIntent().getStringExtra("destination") == "ChooserActivity":
|
||||
print("Stopped for Chooser")
|
||||
else:
|
||||
print("Stopped for other screen")
|
||||
|
||||
|
||||
class ViewActivity(Activity):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def onCreate(self):
|
||||
screen = lv.obj()
|
||||
# Get content from intent (prefer extras.url, fallback to data)
|
||||
content = self.getIntent().extras.get("url", self.getIntent().data or "No content")
|
||||
label = lv.label(screen)
|
||||
label.set_user_data("content_label")
|
||||
label.set_text(f"Viewing: {content}")
|
||||
label.center()
|
||||
self.setContentView(screen)
|
||||
|
||||
def onStart(self, screen):
|
||||
content = self.getIntent().extras.get("url", self.getIntent().data or "No content")
|
||||
for i in range(screen.get_child_cnt()):
|
||||
if screen.get_child(i).get_user_data() == "content_label":
|
||||
screen.get_child(i).set_text(f"Viewing: {content}")
|
||||
|
||||
def onStop(self, screen):
|
||||
if self.getIntent() and self.getIntent().getStringExtra("destination") == "ViewActivity":
|
||||
print("Stopped for View")
|
||||
else:
|
||||
print("Stopped for other screen")
|
||||
|
||||
class ShareActivity(Activity):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def onCreate(self):
|
||||
screen = lv.obj()
|
||||
# Get text from intent (prefer extras.text, fallback to data)
|
||||
text = self.getIntent().extras.get("text", self.getIntent().data or "No text")
|
||||
label = lv.label(screen)
|
||||
label.set_user_data("share_label")
|
||||
label.set_text(f"Share: {text}")
|
||||
label.set_pos(10, 10)
|
||||
|
||||
btn = lv.btn(screen)
|
||||
btn.set_user_data("share_btn")
|
||||
btn_label = lv.label(btn)
|
||||
btn_label.set_text("Share")
|
||||
btn.set_pos(10, 50)
|
||||
btn.add_event_cb(lambda e: self._share_content(text), lv.EVENT.CLICKED)
|
||||
self.setContentView(screen)
|
||||
|
||||
def _share_content(self, text):
|
||||
# Dispatch to another app (e.g., MessagingActivity) or simulate sharing
|
||||
print(f"Sharing: {text}") # Placeholder for actual sharing
|
||||
# Example: Launch another share handler
|
||||
navigator.startActivity(Intent(action="share", data=text))
|
||||
navigator.finish() # Close ShareActivity
|
||||
|
||||
def onStop(self, screen):
|
||||
if self.getIntent() and self.getIntent().getStringExtra("destination") == "ShareActivity":
|
||||
print("Stopped for Share")
|
||||
else:
|
||||
print("Stopped for other screen")
|
||||
|
||||
APP_REGISTRY = { # This should be handled by a new class PackageManager:
|
||||
"view": [ViewActivity], # Hypothetical activities
|
||||
"share": [ShareActivity]
|
||||
}
|
||||
@@ -0,0 +1,284 @@
|
||||
import socket
|
||||
import asyncio as a
|
||||
import binascii as b
|
||||
import random as r
|
||||
from collections import namedtuple
|
||||
import re
|
||||
import struct
|
||||
import ssl
|
||||
|
||||
# Opcodes
|
||||
OP_CONT = const(0x0)
|
||||
OP_TEXT = const(0x1)
|
||||
OP_BYTES = const(0x2)
|
||||
OP_CLOSE = const(0x8)
|
||||
OP_PING = const(0x9)
|
||||
OP_PONG = const(0xa)
|
||||
|
||||
# Close codes
|
||||
CLOSE_OK = const(1000)
|
||||
CLOSE_GOING_AWAY = const(1001)
|
||||
CLOSE_PROTOCOL_ERROR = const(1002)
|
||||
CLOSE_DATA_NOT_SUPPORTED = const(1003)
|
||||
CLOSE_BAD_DATA = const(1007)
|
||||
CLOSE_POLICY_VIOLATION = const(1008)
|
||||
CLOSE_TOO_BIG = const(1009)
|
||||
CLOSE_MISSING_EXTN = const(1010)
|
||||
CLOSE_BAD_CONDITION = const(1011)
|
||||
|
||||
URL_RE = re.compile(r'(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?')
|
||||
URI = namedtuple('URI', ('protocol', 'hostname', 'port', 'path'))
|
||||
|
||||
class AsyncWebsocketClient:
|
||||
def __init__(self, ms_delay_for_read: int = 5):
|
||||
self._open = False
|
||||
self.delay_read = ms_delay_for_read
|
||||
self._lock_for_open = a.Lock()
|
||||
self.sock = None
|
||||
|
||||
async def open(self, new_val: bool = None):
|
||||
await self._lock_for_open.acquire()
|
||||
if new_val is not None:
|
||||
if not new_val and self.sock:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
self._open = new_val
|
||||
to_return = self._open
|
||||
self._lock_for_open.release()
|
||||
return to_return
|
||||
|
||||
async def close(self):
|
||||
return await self.open(False)
|
||||
|
||||
def urlparse(self, uri):
|
||||
"""Parse ws or wss:// URLs"""
|
||||
match = URL_RE.match(uri)
|
||||
if match:
|
||||
protocol, host, port, path = match.group(1), match.group(2), match.group(3), match.group(4)
|
||||
|
||||
if protocol not in ['ws', 'wss']:
|
||||
raise ValueError('Scheme {} is invalid'.format(protocol))
|
||||
|
||||
if port is None:
|
||||
port = (80, 443)[protocol == 'wss']
|
||||
|
||||
return URI(protocol, host, int(port), path)
|
||||
|
||||
async def a_readline(self):
|
||||
line = None
|
||||
while line is None:
|
||||
line = self.sock.readline()
|
||||
await a.sleep_ms(self.delay_read)
|
||||
|
||||
return line
|
||||
|
||||
async def a_read(self, size: int = None):
|
||||
if size == 0:
|
||||
return b''
|
||||
chunks = []
|
||||
|
||||
while True:
|
||||
b = self.sock.read(size)
|
||||
await a.sleep_ms(self.delay_read)
|
||||
|
||||
# Continue reading if the socket returns None
|
||||
if b is None: continue
|
||||
|
||||
# In some cases, the socket will return an empty bytes
|
||||
# after PING or PONG frames, we need to ignore them.
|
||||
if len(b) == 0: break
|
||||
|
||||
chunks.append(b)
|
||||
size -= len(b)
|
||||
|
||||
# After reading the first chunk, we can break if size is None or 0
|
||||
if size is None or size == 0: break
|
||||
|
||||
# Join all the chunks and return them
|
||||
return b''.join(chunks)
|
||||
|
||||
async def handshake(self, uri, headers=[], keyfile=None, certfile=None, cafile=None, cert_reqs=0):
|
||||
if self.sock:
|
||||
self.close()
|
||||
|
||||
self.sock = socket.socket()
|
||||
self.uri = self.urlparse(uri)
|
||||
ai = socket.getaddrinfo(self.uri.hostname, self.uri.port)
|
||||
addr = ai[0][4]
|
||||
|
||||
self.sock.connect(addr)
|
||||
self.sock.setblocking(False)
|
||||
|
||||
if self.uri.protocol == 'wss':
|
||||
cadata = None
|
||||
if not cafile is None:
|
||||
with open(cafile, 'rb') as f:
|
||||
cadata = f.read()
|
||||
self.sock = ssl.wrap_socket(
|
||||
self.sock, server_side=False,
|
||||
key=keyfile, cert=certfile,
|
||||
cert_reqs=cert_reqs, # 0 - NONE, 1 - OPTIONAL, 2 - REQUIED
|
||||
cadata=cadata,
|
||||
server_hostname=self.uri.hostname
|
||||
)
|
||||
|
||||
def send_header(header, *args):
|
||||
self.sock.write(header % args + '\r\n')
|
||||
|
||||
# Sec-WebSocket-Key is 16 bytes of random base64 encoded
|
||||
key = b.b2a_base64(bytes(r.getrandbits(8)
|
||||
for _ in range(16)))[:-1]
|
||||
|
||||
send_header(b'GET %s HTTP/1.1', self.uri.path or '/')
|
||||
send_header(b'Host: %s:%s', self.uri.hostname, self.uri.port)
|
||||
send_header(b'Connection: Upgrade')
|
||||
send_header(b'Upgrade: websocket')
|
||||
send_header(b'Sec-WebSocket-Key: %s', key)
|
||||
send_header(b'Sec-WebSocket-Version: 13')
|
||||
send_header(b'Origin: http://{hostname}:{port}'.format(
|
||||
hostname=self.uri.hostname,
|
||||
port=self.uri.port)
|
||||
)
|
||||
|
||||
for key, value in headers:
|
||||
send_header(b'%s: %s', key, value)
|
||||
|
||||
send_header(b'')
|
||||
|
||||
line = await self.a_readline()
|
||||
header = (line)[:-2]
|
||||
if not header.startswith(b'HTTP/1.1 101 '):
|
||||
raise Exception(header)
|
||||
|
||||
# We don't (currently) need these headers
|
||||
# FIXME: should we check the return key?
|
||||
while header:
|
||||
line = await self.a_readline()
|
||||
header = (line)[:-2]
|
||||
|
||||
return await self.open(True)
|
||||
|
||||
async def read_frame(self, max_size=None):
|
||||
# Frame header
|
||||
byte1, byte2 = struct.unpack('!BB', await self.a_read(2))
|
||||
|
||||
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
|
||||
fin = bool(byte1 & 0x80)
|
||||
opcode = byte1 & 0x0f
|
||||
|
||||
# Byte 2: MASK(1) LENGTH(7)
|
||||
mask = bool(byte2 & (1 << 7))
|
||||
length = byte2 & 0x7f
|
||||
|
||||
if length == 126: # Magic number, length header is 2 bytes
|
||||
length, = struct.unpack('!H', await self.a_read(2))
|
||||
elif length == 127: # Magic number, length header is 8 bytes
|
||||
length, = struct.unpack('!Q', await self.a_read(8))
|
||||
|
||||
if mask: # Mask is 4 bytes
|
||||
mask_bits = await self.a_read(4)
|
||||
|
||||
try:
|
||||
data = await self.a_read(length)
|
||||
except MemoryError:
|
||||
# We can't receive this many bytes, close the socket
|
||||
self.close(code=CLOSE_TOO_BIG)
|
||||
# await self._stream.drain()
|
||||
return True, OP_CLOSE, None
|
||||
|
||||
if mask:
|
||||
data = bytes(b ^ mask_bits[i % 4]
|
||||
for i, b in enumerate(data))
|
||||
|
||||
return fin, opcode, data
|
||||
|
||||
def write_frame(self, opcode, data=b''):
|
||||
fin = True
|
||||
mask = True # messages sent by client are masked
|
||||
|
||||
length = len(data)
|
||||
|
||||
# Frame header
|
||||
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
|
||||
byte1 = 0x80 if fin else 0
|
||||
byte1 |= opcode
|
||||
|
||||
# Byte 2: MASK(1) LENGTH(7)
|
||||
byte2 = 0x80 if mask else 0
|
||||
|
||||
if length < 126: # 126 is magic value to use 2-byte length header
|
||||
byte2 |= length
|
||||
self.sock.write(struct.pack('!BB', byte1, byte2))
|
||||
|
||||
elif length < (1 << 16): # Length fits in 2-bytes
|
||||
byte2 |= 126 # Magic code
|
||||
self.sock.write(struct.pack('!BBH', byte1, byte2, length))
|
||||
|
||||
elif length < (1 << 64):
|
||||
byte2 |= 127 # Magic code
|
||||
self.sock.write(struct.pack('!BBQ', byte1, byte2, length))
|
||||
|
||||
else:
|
||||
raise ValueError()
|
||||
|
||||
if mask: # Mask is 4 bytes
|
||||
mask_bits = struct.pack('!I', r.getrandbits(32))
|
||||
self.sock.write(mask_bits)
|
||||
data = bytes(b ^ mask_bits[i % 4]
|
||||
for i, b in enumerate(data))
|
||||
|
||||
self.sock.write(data)
|
||||
|
||||
async def recv(self):
|
||||
while await self.open():
|
||||
try:
|
||||
fin, opcode, data = await self.read_frame()
|
||||
# except (ValueError, EOFError) as ex:
|
||||
except Exception as ex:
|
||||
print('Exception in recv while reading frame:', ex)
|
||||
await self.open(False)
|
||||
return
|
||||
|
||||
if not fin:
|
||||
raise NotImplementedError()
|
||||
|
||||
if opcode == OP_TEXT:
|
||||
return data.decode('utf-8')
|
||||
elif opcode == OP_BYTES:
|
||||
return data
|
||||
elif opcode == OP_CLOSE:
|
||||
await self.open(False)
|
||||
return
|
||||
elif opcode == OP_PONG:
|
||||
# Ignore this frame, keep waiting for a data frame
|
||||
continue
|
||||
elif opcode == OP_PING:
|
||||
try:
|
||||
# We need to send a pong frame
|
||||
self.write_frame(OP_PONG, data)
|
||||
|
||||
# And then continue to wait for a data frame
|
||||
continue
|
||||
except Exception as ex:
|
||||
print('Error sending pong frame:', ex)
|
||||
# If sending the pong frame fails, close the connection
|
||||
await self.open(False)
|
||||
return
|
||||
elif opcode == OP_CONT:
|
||||
# This is a continuation of a previous frame
|
||||
raise NotImplementedError(opcode)
|
||||
else:
|
||||
raise ValueError(opcode)
|
||||
|
||||
async def send(self, buf):
|
||||
if not await self.open():
|
||||
return
|
||||
if isinstance(buf, str):
|
||||
opcode = OP_TEXT
|
||||
buf = buf.encode('utf-8')
|
||||
elif isinstance(buf, bytes):
|
||||
opcode = OP_BYTES
|
||||
else:
|
||||
raise TypeError()
|
||||
self.write_frame(opcode, buf)
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
# Create a dropdown
|
||||
dropdown = lv.dropdown(lv.screen_active())
|
||||
dropdown.set_options("Option 1\nOption 2\nOption 3")
|
||||
dropdown.align(lv.ALIGN.CENTER, 0, 0)
|
||||
|
||||
|
||||
switch = lv.switch(lv.screen_active())
|
||||
switch.center()
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
|
||||
|
||||
import lvgl as lv
|
||||
|
||||
def list_image_decoders():
|
||||
# Initialize LVGL
|
||||
lv.init()
|
||||
|
||||
# Start with the first decoder
|
||||
first = lv.image_decoder_t()
|
||||
#decoder = lv.image.decoder_get_next(first)
|
||||
decoder = lv.image_decoder_t.get_next(first)
|
||||
index = 0
|
||||
|
||||
# Iterate through all decoders
|
||||
while decoder is not None:
|
||||
print(f"Image Decoder {index}: {decoder}")
|
||||
index += 1
|
||||
#decoder = lv.image.decoder_get_next(decoder)
|
||||
decoder = lv.image_decoder_t.get_next(decoder)
|
||||
|
||||
if index == 0:
|
||||
print("No image decoders found.")
|
||||
else:
|
||||
print(f"Total image decoders: {index}")
|
||||
|
||||
# Run the function
|
||||
list_image_decoders()
|
||||
|
||||
|
||||
i = lv.image(lv.screen_active());
|
||||
#i.set_src("P:/home/user/sources/MicroPythonOS/artwork/image.jpg");
|
||||
i.set_src("P:/home/user/sources/MicroPythonOS/artwork/icon_64x64.jpg");
|
||||
i.center()
|
||||
h = lv.image_header_t()
|
||||
i.decoder_get_info(i.image_dsc, h)
|
||||
print("image info:")
|
||||
print(h)
|
||||
print(f"widthxheight: {h.w}x{h.h}")
|
||||
@@ -0,0 +1,110 @@
|
||||
# Hardware initialization for Unix and MacOS systems
|
||||
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
|
||||
|
||||
# Add lib/ to the path for modules, otherwise it will only search in ~/.micropython/lib and /usr/lib/micropython
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
|
||||
|
||||
import mpos.ui
|
||||
|
||||
|
||||
#TFT_HOR_RES=640
|
||||
#TFT_VER_RES=480
|
||||
TFT_HOR_RES=320
|
||||
TFT_VER_RES=240
|
||||
|
||||
def window_cb(args): # doesn't get called
|
||||
print(f"Window callback: {args}")
|
||||
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
bus.register_window_callback(window_cb)
|
||||
|
||||
# bus.set_window_size(320,240,-1,False) # -1 might be 25 but it always becomes black, except for format 0
|
||||
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
|
||||
display = sdl_display.SDLDisplay(data_bus=bus,display_width=TFT_HOR_RES,display_height=TFT_VER_RES,frame_buffer1=buf1,color_space=lv.COLOR_FORMAT.RGB565)
|
||||
display.init()
|
||||
|
||||
import sdl_pointer
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
import sdl_keyboard
|
||||
sdlkeyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
#indev.set_read_cb(keypad_cb)
|
||||
|
||||
# seems indev isn't properly initialized
|
||||
def keypad_cb(indev, indev_data):
|
||||
global sdlkeyboard
|
||||
#print(f"keypad_cb {indev} {indev_data}")
|
||||
#key = indev.get_key() # always 0
|
||||
#print(f"key {key}")
|
||||
#key = indev_data.get("key")
|
||||
#print(f"key {key}")
|
||||
pressed, code = sdlkeyboard._get_key()
|
||||
print(f"periodic pressed: {pressed}, code: {code}")
|
||||
sdlkeyboard._read(indev, indev_data)
|
||||
# I mean we could read the key and put it in the textarea but I want some kind of keypress :-/
|
||||
|
||||
sdlkeyboard._indev_drv.set_read_cb(keypad_cb) # check for escape
|
||||
|
||||
def keyboard_cb(event):
|
||||
event_code=event.get_code()
|
||||
print(f"keyboard_test YES: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
|
||||
|
||||
|
||||
def button_cb(event):
|
||||
event_code=event.get_code()
|
||||
name = mpos.ui.get_event_name(event_code)
|
||||
print(f"button_cb YES: code={event_code} and name {name}")
|
||||
|
||||
# for some reason, this text areas is receiving mouse events, and draw events, but not key events...
|
||||
def ta_callback_again(event):
|
||||
event_code=event.get_code()
|
||||
if event_code in [19,23,25,26,27,28,29,30,49]:
|
||||
return
|
||||
name = mpos.ui.get_event_name(event_code)
|
||||
print(f"ta_callback_again {event_code} and {name}")
|
||||
#print(f"ta_callback_again: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
|
||||
|
||||
sdlkeyboard.add_event_cb(keyboard_cb, lv.EVENT.ALL, None)
|
||||
|
||||
|
||||
#group = lv.group_create()
|
||||
#group = keyboard.get_group()
|
||||
|
||||
th = task_handler.TaskHandler(duration=5) # 5ms is recommended for MicroPython+LVGL on desktop
|
||||
|
||||
screen = lv.screen_active()
|
||||
|
||||
b = lv.button(screen)
|
||||
b.center()
|
||||
b.add_event_cb(button_cb, lv.EVENT.ALL, None)
|
||||
#group.add_obj(b)
|
||||
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.align(lv.ALIGN.TOP_LEFT,0,0)
|
||||
ta.add_event_cb(ta_callback_again, lv.EVENT.ALL, None)
|
||||
|
||||
#group.add_obj(ta)
|
||||
|
||||
takeyboard = lv.keyboard(screen)
|
||||
takeyboard.set_textarea(ta)
|
||||
|
||||
|
||||
# this does something, but just gives indev 0, being error...
|
||||
#indev = lv.indev_create()
|
||||
#indev.set_type(lv.INDEV_TYPE.KEYPAD)
|
||||
#indev.set_read_cb(keypad_cb) # check for escape
|
||||
|
||||
|
||||
|
||||
#keyboard.set_group(group)
|
||||
@@ -0,0 +1,69 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import mpos.ui
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Display resolution
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
|
||||
# Initialize display
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
# Create group for input devices
|
||||
group = lv.group_create()
|
||||
keyboard.set_group(group)
|
||||
|
||||
|
||||
# Create textarea
|
||||
screen = lv.screen_active()
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 0, 0)
|
||||
ta.set_placeholder_text("Type here")
|
||||
group.add_obj(ta)
|
||||
|
||||
# Optional: Debug event callback for textarea
|
||||
def ta_event_cb(event):
|
||||
event_code = event.get_code()
|
||||
name = mpos.ui.get_event_name(event_code)
|
||||
print(f"Textarea event: code={event_code}, name={name}")
|
||||
|
||||
ta.add_event_cb(ta_event_cb, lv.EVENT.ALL, None)
|
||||
|
||||
# Optional: Create an on-screen keyboard
|
||||
keyboard_widget = lv.keyboard(screen)
|
||||
keyboard_widget.set_textarea(ta)
|
||||
keyboard_widget.add_flag(lv.obj.FLAG.HIDDEN)
|
||||
|
||||
def ta_focus_cb(event):
|
||||
event_code = event.get_code()
|
||||
if event_code == lv.EVENT.FOCUSED:
|
||||
keyboard_widget.clear_flag(lv.obj.FLAG.HIDDEN)
|
||||
elif event_code == lv.EVENT.DEFOCUSED:
|
||||
keyboard_widget.add_flag(lv.obj.FLAG.HIDDEN)
|
||||
|
||||
ta.add_event_cb(ta_focus_cb, lv.EVENT.FOCUSED | lv.EVENT.DEFOCUSED, None)
|
||||
|
||||
# Task handler
|
||||
th = task_handler.TaskHandler(duration=5) # 5ms for desktop
|
||||
@@ -0,0 +1,113 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Initialize display
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
#keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
pressed = False
|
||||
def get_key(indev,data):
|
||||
print("simulating get_key")
|
||||
global pressed
|
||||
if not pressed:
|
||||
# input your keypad code
|
||||
data.state = 1 #1 for press 0 for released
|
||||
data.key=100
|
||||
#pressed = True
|
||||
else:
|
||||
data.state = 0 #1 for press 0 for released
|
||||
data.key=100
|
||||
pressed = False
|
||||
|
||||
|
||||
# Create group
|
||||
group = lv.group_create()
|
||||
group.set_default()
|
||||
|
||||
keyboard=lv.indev_create()
|
||||
keyboard.set_type(lv.INDEV_TYPE.KEYPAD)
|
||||
keyboard.set_read_cb(get_key)
|
||||
keyboard.set_group(group)
|
||||
|
||||
#keyboard.set_group(group)
|
||||
|
||||
# Create widgets
|
||||
screen = lv.screen_active()
|
||||
|
||||
# Textarea
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.set_placeholder_text("Type here")
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
|
||||
group.add_obj(ta)
|
||||
|
||||
# Switch
|
||||
sw = lv.switch(screen)
|
||||
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
|
||||
group.add_obj(sw)
|
||||
|
||||
# Test Button
|
||||
btn = lv.button(screen)
|
||||
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
|
||||
lbl = lv.label(btn)
|
||||
lbl.set_text("Test Button")
|
||||
group.add_obj(btn)
|
||||
|
||||
# Simulate NEXT key button
|
||||
btn_next = lv.button(screen)
|
||||
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
|
||||
lbl_next = lv.label(btn_next)
|
||||
lbl_next.set_text("NEXT")
|
||||
def btn_next_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
|
||||
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
|
||||
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Simulate ENTER key button
|
||||
btn_enter = lv.button(screen)
|
||||
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
|
||||
lbl_enter = lv.label(btn_enter)
|
||||
lbl_enter.set_text("ENTER")
|
||||
def btn_enter_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
|
||||
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
|
||||
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Debug focus
|
||||
def check_focus():
|
||||
focused = lv.group_get_focused(group)
|
||||
print(f"Focused widget: {focused}")
|
||||
th = task_handler.TaskHandler(duration=5)
|
||||
th.register_task(check_focus, 1000)
|
||||
|
||||
# Debug events
|
||||
def event_cb(event, name):
|
||||
event_code = event.get_code()
|
||||
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
|
||||
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
|
||||
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
|
||||
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
|
||||
@@ -0,0 +1,121 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Initialize display
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
# Create group
|
||||
group = lv.group_create()
|
||||
group.set_default()
|
||||
keyboard.set_group(group)
|
||||
|
||||
# Simulated key input for buttons
|
||||
simulated_key = None
|
||||
simulated_state = None
|
||||
def get_key(indev, data):
|
||||
global simulated_key, simulated_state
|
||||
if simulated_key is not None:
|
||||
print(f"Simulating key: state={simulated_state}, key={simulated_key}")
|
||||
data.state = simulated_state
|
||||
data.key = simulated_key
|
||||
simulated_key = None # Clear after processing
|
||||
else:
|
||||
data.state = 0 # No key event by default
|
||||
data.key = 0
|
||||
|
||||
# Create custom input device for simulated keys
|
||||
sim_indev = lv.indev_create()
|
||||
sim_indev.set_type(lv.INDEV_TYPE.KEYPAD)
|
||||
sim_indev.set_read_cb(get_key)
|
||||
sim_indev.set_group(group)
|
||||
|
||||
# Create widgets
|
||||
screen = lv.screen_active()
|
||||
|
||||
# Textarea
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.set_placeholder_text("Type here")
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
|
||||
group.add_obj(ta)
|
||||
|
||||
# Switch
|
||||
sw = lv.switch(screen)
|
||||
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
|
||||
group.add_obj(sw)
|
||||
|
||||
# Test Button
|
||||
btn = lv.button(screen)
|
||||
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
|
||||
lbl = lv.label(btn)
|
||||
lbl.set_text("Test Button")
|
||||
group.add_obj(btn)
|
||||
|
||||
# Simulate NEXT key button
|
||||
btn_next = lv.button(screen)
|
||||
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
|
||||
lbl_next = lv.label(btn_next)
|
||||
lbl_next.set_text("PREV")
|
||||
def btn_next_cb(event):
|
||||
global simulated_key, simulated_state
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
simulated_key = lv.KEY.PREV
|
||||
simulated_state = 1
|
||||
sim_indev.read() # Trigger read immediately
|
||||
simulated_state = 0
|
||||
sim_indev.read()
|
||||
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Simulate ENTER key button
|
||||
btn_enter = lv.button(screen)
|
||||
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
|
||||
lbl_enter = lv.label(btn_enter)
|
||||
lbl_enter.set_text("ENTER")
|
||||
def btn_enter_cb(event):
|
||||
global simulated_key, simulated_state
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
simulated_key = lv.KEY.ENTER
|
||||
simulated_state = 1
|
||||
sim_indev.read()
|
||||
simulated_state = 0
|
||||
sim_indev.read()
|
||||
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Debug focus
|
||||
def check_focus():
|
||||
focused = lv.group_get_focused(group)
|
||||
print(f"Focused widget: {focused}")
|
||||
th = task_handler.TaskHandler(duration=5)
|
||||
th.register_task(check_focus, 1000)
|
||||
|
||||
# Debug events
|
||||
def event_cb(event, name):
|
||||
event_code = event.get_code()
|
||||
key = event.get_key() if event_code == lv.EVENT.KEY else None
|
||||
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}, key={key}")
|
||||
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
|
||||
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
|
||||
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
|
||||
@@ -0,0 +1,114 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Initialize display
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
pressed = False
|
||||
def get_key(indev,data):
|
||||
print("simulating get_key")
|
||||
global pressed
|
||||
if not pressed:
|
||||
# input your keypad code
|
||||
data.state = 1 #1 for press 0 for released
|
||||
data.key=100
|
||||
#pressed = True
|
||||
else:
|
||||
data.state = 0 #1 for press 0 for released
|
||||
data.key=100
|
||||
pressed = False
|
||||
|
||||
|
||||
# Create group
|
||||
#group = lv.group_create()
|
||||
#group.set_default()
|
||||
group = keyboard.get_group()
|
||||
|
||||
#keyboard=lv.indev_create()
|
||||
#keyboard.set_type(lv.INDEV_TYPE.KEYPAD)
|
||||
#keyboard.set_read_cb(get_key)
|
||||
#keyboard.set_group(group)
|
||||
|
||||
#keyboard.set_group(group)
|
||||
|
||||
# Create widgets
|
||||
screen = lv.screen_active()
|
||||
|
||||
# Textarea
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.set_placeholder_text("Type here")
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
|
||||
group.add_obj(ta)
|
||||
|
||||
# Switch
|
||||
sw = lv.switch(screen)
|
||||
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
|
||||
group.add_obj(sw)
|
||||
|
||||
# Test Button
|
||||
btn = lv.button(screen)
|
||||
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
|
||||
lbl = lv.label(btn)
|
||||
lbl.set_text("Test Button")
|
||||
group.add_obj(btn)
|
||||
|
||||
# Simulate NEXT key button
|
||||
btn_next = lv.button(screen)
|
||||
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
|
||||
lbl_next = lv.label(btn_next)
|
||||
lbl_next.set_text("NEXT")
|
||||
def btn_next_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
|
||||
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
|
||||
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Simulate ENTER key button
|
||||
btn_enter = lv.button(screen)
|
||||
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
|
||||
lbl_enter = lv.label(btn_enter)
|
||||
lbl_enter.set_text("ENTER")
|
||||
def btn_enter_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
|
||||
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
|
||||
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Debug focus
|
||||
def check_focus():
|
||||
focused = lv.group_get_focused(group)
|
||||
print(f"Focused widget: {focused}")
|
||||
th = task_handler.TaskHandler(duration=5)
|
||||
th.register_task(check_focus, 1000)
|
||||
|
||||
# Debug events
|
||||
def event_cb(event, name):
|
||||
event_code = event.get_code()
|
||||
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
|
||||
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
|
||||
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
|
||||
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
|
||||
@@ -0,0 +1,114 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Initialize display
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
pressed = False
|
||||
def get_key(indev,data):
|
||||
print("simulating get_key")
|
||||
global pressed
|
||||
if not pressed:
|
||||
# input your keypad code
|
||||
data.state = 1 #1 for press 0 for released
|
||||
data.key=100
|
||||
#pressed = True
|
||||
else:
|
||||
data.state = 0 #1 for press 0 for released
|
||||
data.key=100
|
||||
pressed = False
|
||||
|
||||
|
||||
# Create group
|
||||
#group = lv.group_create()
|
||||
#group.set_default()
|
||||
group = keyboard.get_group()
|
||||
|
||||
#keyboard=lv.indev_create()
|
||||
#keyboard.set_type(lv.INDEV_TYPE.KEYPAD)
|
||||
#keyboard.set_read_cb(get_key)
|
||||
#keyboard.set_group(group)
|
||||
|
||||
#keyboard.set_group(group)
|
||||
|
||||
# Create widgets
|
||||
screen = lv.screen_active()
|
||||
|
||||
# Textarea
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.set_placeholder_text("Type here")
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
|
||||
group.add_obj(ta)
|
||||
|
||||
# Switch
|
||||
sw = lv.switch(screen)
|
||||
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
|
||||
group.add_obj(sw)
|
||||
|
||||
# Test Button
|
||||
btn = lv.button(screen)
|
||||
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
|
||||
lbl = lv.label(btn)
|
||||
lbl.set_text("Test Button")
|
||||
group.add_obj(btn)
|
||||
|
||||
# Simulate NEXT key button
|
||||
btn_next = lv.button(screen)
|
||||
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
|
||||
lbl_next = lv.label(btn_next)
|
||||
lbl_next.set_text("NEXT")
|
||||
def btn_next_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
|
||||
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
|
||||
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Simulate ENTER key button
|
||||
btn_enter = lv.button(screen)
|
||||
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
|
||||
lbl_enter = lv.label(btn_enter)
|
||||
lbl_enter.set_text("ENTER")
|
||||
def btn_enter_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
|
||||
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
|
||||
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Debug focus
|
||||
def check_focus():
|
||||
focused = lv.group_get_focused(group)
|
||||
print(f"Focused widget: {focused}")
|
||||
th = task_handler.TaskHandler(duration=5)
|
||||
th.register_task(check_focus, 1000)
|
||||
|
||||
# Debug events
|
||||
def event_cb(event, name):
|
||||
event_code = event.get_code()
|
||||
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
|
||||
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
|
||||
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
|
||||
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
|
||||
@@ -0,0 +1,103 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Initialize display
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
# Create group
|
||||
group = lv.group_create()
|
||||
group.set_default() # Set as default group
|
||||
keyboard.set_group(group)
|
||||
|
||||
# Create widgets
|
||||
screen = lv.screen_active()
|
||||
|
||||
# Textarea
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.set_placeholder_text("Type here")
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
|
||||
group.add_obj(ta)
|
||||
|
||||
# Switch
|
||||
sw = lv.switch(screen)
|
||||
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
|
||||
group.add_obj(sw)
|
||||
|
||||
# Button
|
||||
btn = lv.button(screen)
|
||||
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
|
||||
lbl = lv.label(btn)
|
||||
lbl.set_text("Test Button")
|
||||
group.add_obj(btn)
|
||||
|
||||
# Simulate NEXT key button
|
||||
btn_next = lv.button(screen)
|
||||
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
|
||||
lbl_next = lv.label(btn_next)
|
||||
lbl_next.set_text("NEXT")
|
||||
def btn_next_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard.send_event(lv.EVENT.KEY, lv.KEY.NEXT)
|
||||
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Simulate ENTER key button
|
||||
btn_enter = lv.button(screen)
|
||||
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
|
||||
lbl_enter = lv.label(btn_enter)
|
||||
lbl_enter.set_text("ENTER")
|
||||
def btn_enter_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard.send_event(lv.EVENT.KEY, lv.KEY.ENTER)
|
||||
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Debug focus
|
||||
def check_focus():
|
||||
focused = lv.group_get_focused(group)
|
||||
print(f"Focused widget: {focused}")
|
||||
th = task_handler.TaskHandler(duration=5)
|
||||
th.register_task(check_focus, 1000) # Check focus every 1s
|
||||
|
||||
# Debug textarea events
|
||||
def ta_event_cb(event):
|
||||
event_code = event.get_code()
|
||||
name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')
|
||||
print(f"Textarea event: code={event_code}, name={name}")
|
||||
ta.add_event_cb(ta_event_cb, lv.EVENT.ALL, None)
|
||||
|
||||
# Debug switch events
|
||||
def sw_event_cb(event):
|
||||
event_code = event.get_code()
|
||||
name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')
|
||||
print(f"Switch event: code={event_code}, name={name}")
|
||||
sw.add_event_cb(sw_event_cb, lv.EVENT.ALL, None)
|
||||
|
||||
# Debug button events
|
||||
def btn_event_cb(event):
|
||||
event_code = event.get_code()
|
||||
name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')
|
||||
print(f"Button event: code={event_code}, name={name}")
|
||||
btn.add_event_cb(btn_event_cb, lv.EVENT.ALL, None)
|
||||
@@ -0,0 +1,92 @@
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
import sdl_pointer
|
||||
import sdl_keyboard
|
||||
|
||||
# Initialize display
|
||||
TFT_HOR_RES = 320
|
||||
TFT_VER_RES = 240
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
display = sdl_display.SDLDisplay(
|
||||
data_bus=bus,
|
||||
display_width=TFT_HOR_RES,
|
||||
display_height=TFT_VER_RES,
|
||||
frame_buffer1=buf1,
|
||||
color_space=lv.COLOR_FORMAT.RGB565
|
||||
)
|
||||
display.init()
|
||||
|
||||
# Initialize mouse
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
# Initialize keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
# Create group
|
||||
group = lv.group_create()
|
||||
group.set_default()
|
||||
keyboard.set_group(group)
|
||||
|
||||
# Create widgets
|
||||
screen = lv.screen_active()
|
||||
|
||||
# Textarea
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.set_placeholder_text("Type here")
|
||||
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
|
||||
group.add_obj(ta)
|
||||
|
||||
# Switch
|
||||
sw = lv.switch(screen)
|
||||
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
|
||||
group.add_obj(sw)
|
||||
|
||||
# Test Button
|
||||
btn = lv.button(screen)
|
||||
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
|
||||
lbl = lv.label(btn)
|
||||
lbl.set_text("Test Button")
|
||||
group.add_obj(btn)
|
||||
|
||||
# Simulate NEXT key button
|
||||
btn_next = lv.button(screen)
|
||||
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
|
||||
lbl_next = lv.label(btn_next)
|
||||
lbl_next.set_text("NEXT")
|
||||
def btn_next_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
|
||||
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
|
||||
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Simulate ENTER key button
|
||||
btn_enter = lv.button(screen)
|
||||
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
|
||||
lbl_enter = lv.label(btn_enter)
|
||||
lbl_enter.set_text("ENTER")
|
||||
def btn_enter_cb(event):
|
||||
if event.get_code() == lv.EVENT.CLICKED:
|
||||
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
|
||||
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
|
||||
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
|
||||
|
||||
# Debug focus
|
||||
def check_focus():
|
||||
focused = lv.group_get_focused(group)
|
||||
print(f"Focused widget: {focused}")
|
||||
th = task_handler.TaskHandler(duration=5)
|
||||
th.register_task(check_focus, 1000)
|
||||
|
||||
# Debug events
|
||||
def event_cb(event, name):
|
||||
event_code = event.get_code()
|
||||
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
|
||||
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
|
||||
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
|
||||
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
|
||||
@@ -0,0 +1,103 @@
|
||||
# Hardware initialization for Unix and MacOS systems
|
||||
|
||||
import lcd_bus
|
||||
import lvgl as lv
|
||||
import sdl_display
|
||||
import task_handler
|
||||
|
||||
|
||||
# Add lib/ to the path for modules, otherwise it will only search in ~/.micropython/lib and /usr/lib/micropython
|
||||
import sys
|
||||
sys.path.append('lib/')
|
||||
|
||||
|
||||
import mpos.ui
|
||||
|
||||
|
||||
#TFT_HOR_RES=640
|
||||
#TFT_VER_RES=480
|
||||
TFT_HOR_RES=320
|
||||
TFT_VER_RES=240
|
||||
|
||||
def window_cb(args): # doesn't get called
|
||||
print(f"Window callback: {args}")
|
||||
|
||||
bus = lcd_bus.SDLBus(flags=0)
|
||||
bus.register_window_callback(window_cb)
|
||||
|
||||
# bus.set_window_size(320,240,-1,False) # -1 might be 25 but it always becomes black, except for format 0
|
||||
|
||||
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
|
||||
|
||||
display = sdl_display.SDLDisplay(data_bus=bus,display_width=TFT_HOR_RES,display_height=TFT_VER_RES,frame_buffer1=buf1,color_space=lv.COLOR_FORMAT.RGB565)
|
||||
display.init()
|
||||
|
||||
import sdl_pointer
|
||||
mouse = sdl_pointer.SDLPointer()
|
||||
|
||||
import sdl_keyboard
|
||||
keyboard = sdl_keyboard.SDLKeyboard()
|
||||
|
||||
# seems indev isn't properly initialized
|
||||
def keypad_cb(indev, indev_data):
|
||||
#print(f"keypad_cb {indev} {indev_data}")
|
||||
#key = indev.get_key() # always 0
|
||||
#print(f"key {key}")
|
||||
#key = indev_data.get("key")
|
||||
#print(f"key {key}")
|
||||
pressed, code = keyboard._get_key()
|
||||
print(f"periodic pressed: {pressed}, code: {code}")
|
||||
# I mean we could read the key and put it in the textarea but I want some kind of keypress :-/
|
||||
|
||||
def keyboard_cb(event):
|
||||
event_code=event.get_code()
|
||||
print(f"keyboard_test YES: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
|
||||
|
||||
|
||||
def button_cb(event):
|
||||
event_code=event.get_code()
|
||||
name = mpos.ui.get_event_name(event_code)
|
||||
print(f"button_cb YES: code={event_code} and name {name}")
|
||||
|
||||
# for some reason, this text areas is receiving mouse events, and draw events, but not key events...
|
||||
def ta_callback_again(event):
|
||||
event_code=event.get_code()
|
||||
if event_code in [19,23,25,26,27,28,29,30,49]:
|
||||
return
|
||||
name = mpos.ui.get_event_name(event_code)
|
||||
print(f"ta_callback_again {event_code} and {name}")
|
||||
#print(f"ta_callback_again: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
|
||||
|
||||
keyboard.add_event_cb(keyboard_cb, lv.EVENT.ALL, None)
|
||||
|
||||
|
||||
#group = lv.group_create()
|
||||
#group = keyboard.get_group()
|
||||
|
||||
th = task_handler.TaskHandler(duration=5) # 5ms is recommended for MicroPython+LVGL on desktop
|
||||
|
||||
screen = lv.screen_active()
|
||||
|
||||
b = lv.button(screen)
|
||||
b.center()
|
||||
b.add_event_cb(button_cb, lv.EVENT.ALL, None)
|
||||
#group.add_obj(b)
|
||||
|
||||
ta = lv.textarea(screen)
|
||||
ta.set_one_line(True)
|
||||
ta.align(lv.ALIGN.TOP_LEFT,0,0)
|
||||
#group.add_obj(ta)
|
||||
|
||||
keyboard = lv.keyboard(screen)
|
||||
keyboard.set_textarea(ta)
|
||||
|
||||
|
||||
# this does something, but just gives indev 0, being error...
|
||||
#indev = lv.indev_create()
|
||||
#indev.set_type(lv.INDEV_TYPE.KEYPAD)
|
||||
#indev.set_read_cb(keypad_cb)
|
||||
|
||||
ta.add_event_cb(ta_callback_again, lv.EVENT.ALL, None)
|
||||
|
||||
|
||||
#keyboard.set_group(group)
|
||||
@@ -0,0 +1,81 @@
|
||||
diff --git a/ports/unix/main.c b/ports/unix/main.c
|
||||
index 58fa3ff..c022c20 100644
|
||||
--- a/ports/unix/main.c
|
||||
+++ b/ports/unix/main.c
|
||||
@@ -53,6 +53,8 @@
|
||||
#include "extmod/vfs_posix.h"
|
||||
#include "genhdr/mpversion.h"
|
||||
#include "input.h"
|
||||
+#include "machine_sdl.h"
|
||||
+#include "machine_timer.h"
|
||||
|
||||
// Command line options, with their defaults
|
||||
static bool compile_only = false;
|
||||
@@ -61,7 +63,7 @@ static uint emit_opt = MP_EMIT_OPT_NONE;
|
||||
#if MICROPY_ENABLE_GC
|
||||
// Heap size of GC heap (if enabled)
|
||||
// Make it larger on a 64 bit machine, because pointers are larger.
|
||||
-long heap_size = 1024 * 1024 * (sizeof(mp_uint_t) / 4);
|
||||
+long heap_size = 8388608;
|
||||
#endif
|
||||
|
||||
// Number of heaps to assign by default if MICROPY_GC_SPLIT_HEAP=1
|
||||
@@ -192,6 +194,12 @@ static char *strjoin(const char *s1, int sep_char, const char *s2) {
|
||||
}
|
||||
#endif
|
||||
|
||||
+char *mp_repl_get_ps3(void)
|
||||
+{
|
||||
+ return "";
|
||||
+}
|
||||
+
|
||||
+
|
||||
static int do_repl(void) {
|
||||
mp_hal_stdout_tx_str(MICROPY_BANNER_NAME_AND_VERSION);
|
||||
mp_hal_stdout_tx_str("; " MICROPY_BANNER_MACHINE);
|
||||
@@ -282,8 +290,16 @@ static int do_repl(void) {
|
||||
for (;;) {
|
||||
char *line = prompt((char *)mp_repl_get_ps1());
|
||||
if (line == NULL) {
|
||||
- // EOF
|
||||
- return 0;
|
||||
+ if (errno != EWOULDBLOCK) {
|
||||
+ return 0;
|
||||
+ } else {
|
||||
+ while (line == NULL && errno == EWOULDBLOCK) {
|
||||
+ mp_handle_pending(true);
|
||||
+ usleep(1000);
|
||||
+ line = prompt(mp_repl_get_ps3());
|
||||
+ }
|
||||
+ if (line == NULL) return 0;
|
||||
+ }
|
||||
}
|
||||
while (mp_repl_continue_with_input(line)) {
|
||||
char *line2 = prompt((char *)mp_repl_get_ps2());
|
||||
@@ -470,6 +486,17 @@ static void sys_set_excecutable(char *argv0) {
|
||||
MP_NOINLINE int main_(int argc, char **argv);
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
+ // Parse command-line arguments for a custom root
|
||||
+ const char *fs_root = "/home/user/sources/PiggyOS/internal_filesystem"; // Hardcode or parse from argv
|
||||
+ if (chroot(fs_root) != 0) {
|
||||
+ perror("chroot failed");
|
||||
+ return 1;
|
||||
+ }
|
||||
+ if (chdir("/") != 0) {
|
||||
+ perror("chdir failed");
|
||||
+ return 1;
|
||||
+ }
|
||||
+
|
||||
#if MICROPY_PY_THREAD
|
||||
mp_thread_init();
|
||||
#endif
|
||||
@@ -752,6 +779,8 @@ MP_NOINLINE int main_(int argc, char **argv) {
|
||||
MP_STATE_THREAD(prof_trace_callback) = MP_OBJ_NULL;
|
||||
#endif
|
||||
|
||||
+ machine_timer_deinit_all();
|
||||
+ deinit_sdl();
|
||||
#if MICROPY_PY_SYS_ATEXIT
|
||||
// Beware, the sys.settrace callback should be disabled before running sys.atexit.
|
||||
if (mp_obj_is_callable(MP_STATE_VM(sys_exitfunc))) {
|
||||
@@ -0,0 +1,49 @@
|
||||
# it's not super fast but it works!
|
||||
|
||||
import websocket
|
||||
import _thread
|
||||
import time
|
||||
|
||||
def on_message(wsapp, message):
|
||||
print(f"got message: {message}")
|
||||
|
||||
def on_ping(wsapp, message):
|
||||
print("Got a ping! A pong reply has already been automatically sent.")
|
||||
|
||||
def on_pong(wsapp, message):
|
||||
print("Got a pong! No need to respond")
|
||||
|
||||
|
||||
def on_error(wsapp, message):
|
||||
print(f"Got error: {message}")
|
||||
|
||||
|
||||
#wsapp = websocket.WebSocketApp("wss://testnet.binance.vision/ws/btcusdt@trade", on_message=on_message, on_ping=on_ping, on_pong=on_pong, on_error=on_error)
|
||||
|
||||
wsapp = websocket.WebSocketApp("wss://echo.websocket.events", on_message=on_message, on_ping=on_ping, on_pong=on_pong, on_error=on_error)
|
||||
|
||||
def stress_test_thread():
|
||||
print("before run_forever")
|
||||
wsapp.run_forever(ping_interval=15, ping_timeout=10, ping_payload="This is an optional ping payload")
|
||||
print("after run_forever")
|
||||
|
||||
_thread.stack_size(16*1024)
|
||||
_thread.start_new_thread(stress_test_thread, ())
|
||||
|
||||
time.sleep(5)
|
||||
print("sending ok")
|
||||
wsapp.send_text('ok')
|
||||
|
||||
|
||||
time.sleep(15)
|
||||
print("sending again")
|
||||
wsapp.send_text('again')
|
||||
|
||||
|
||||
time.sleep(25)
|
||||
print("sending more")
|
||||
wsapp.send_text('more')
|
||||
|
||||
wsapp.close()
|
||||
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
import json
|
||||
import ssl
|
||||
import time
|
||||
import _thread
|
||||
|
||||
from nostr.filter import Filter, Filters
|
||||
from nostr.event import Event, EventKind
|
||||
from nostr.relay_manager import RelayManager
|
||||
from nostr.message_type import ClientMessageType
|
||||
|
||||
#filters = Filters([Filter(authors=[<a nostr pubkey in hex>], kinds=[EventKind.TEXT_NOTE])])
|
||||
#filters = Filters([Filter(authors=["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"], kinds=[EventKind.TEXT_NOTE])])
|
||||
#timestamp = round(time.time()-50)
|
||||
#timestamp = round(time.time()) # going for zero events to check memory use
|
||||
|
||||
timetogoback = 1000
|
||||
timetogoback = 2419200 # 28 days
|
||||
timetogoback = 7776000 # 3 months
|
||||
|
||||
# event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1745510390 with content "kind":1, Happy news! LightningPiggy is heading to
|
||||
# contacts: event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1746307620 with content and kind 3 and tags [['p', '181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda'], ['p', 'ffb3f96661bda0295389cfc8c8fe65332e98cc24b12bce5ca40e09af3bb0d7bf'],
|
||||
# reaction: event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1746307202 with content + and kind 7 and tags [['e', 'cee2aadc637f56e6f922b95898eecb2bd9b0f0f0ab2e2df6bb6d1e7830c697b0'], ['p', '3e6e0735b8a2e96f8cf663f64d04bb9cea931afcc57f5427c35bb6859e95c8a2']]
|
||||
# event_msg: pubkey: 181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda created_at 1742146581 with content The kWh (kilowatt-hour) was pro... nd kind 1 and tags [['p', '92cbe5861cfc5213dd89f0a6f6084486f85e6f03cfeb70a13f455938116433b8', 'wss://nostrelites.org', 'mention']]
|
||||
|
||||
# somehow, adding kinds breaks it?!
|
||||
# ["REQ", "test1747750250", {"since": 1744011312, "kinds": [1], "authors": ["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"]}]
|
||||
# => this gives me EOSE quickly
|
||||
# ["REQ", "test1747750060", {"since": 1744011312, "authors": ["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"]}]
|
||||
# this worked before, I think:
|
||||
# ["REQ","index",{"kinds":[9735], "#p": ["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"]}]
|
||||
|
||||
import sys
|
||||
if sys.platform == "esp32":
|
||||
# on esp32, it needs this correction:
|
||||
timestamp = time.time() + 946684800 - timetogoback
|
||||
else:
|
||||
timestamp = round(time.time()-timetogoback)
|
||||
#timestamp = round(time.time()-1000)
|
||||
#timestamp = round(time.time()-5000)
|
||||
|
||||
timestamp = 1744011312
|
||||
|
||||
filters = Filters([Filter(authors=["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"], since=timestamp)])
|
||||
#filters = Filters([Filter(authors=["181137054fe60df5168976311f0bf44dbe4bd4d2e0af69325dfee9fa81a8cbda"], since=timestamp, kinds=[EventKind.TEXT_NOTE] )])
|
||||
#filters = Filters([Filter(authors=["04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"], kinds=[9735], since=timestamp)])
|
||||
#filters = Filters([Filter(kinds=[9735], since=timestamp)])
|
||||
|
||||
subscription_id = "test" + str(round(time.time()))
|
||||
request = [ClientMessageType.REQUEST, subscription_id]
|
||||
json.dumps(request)
|
||||
request.extend(filters.to_json_array())
|
||||
message = json.dumps(request)
|
||||
# ["REQ", "ihopethisworks3", {"kinds": [1], "authors": "04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"}]
|
||||
print(f"sending this: {message}")
|
||||
|
||||
def printevents():
|
||||
import micropython
|
||||
print(f"at the start, thread stack used: {micropython.stack_use()}")
|
||||
print("relaymanager")
|
||||
relay_manager = RelayManager()
|
||||
#relay_manager.add_relay("wss://nostr-pub.wellorder.net")
|
||||
print("relaymanager adding")
|
||||
relay_manager.add_relay("wss://relay.primal.net")
|
||||
#relay_manager.add_relay("wss://relay.damus.io")
|
||||
print("relaymanager subscribing")
|
||||
relay_manager.add_subscription(subscription_id, filters)
|
||||
print("opening connections") # after this, CPU usage goes high and stays there
|
||||
relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE}) # NOTE: This disables ssl certificate verification
|
||||
time.sleep(2) # allow the connections to open
|
||||
print("publishing:")
|
||||
relay_manager.publish_message(message)
|
||||
time.sleep(2) # allow the messages to send
|
||||
print("printing events:")
|
||||
#while relay_manager.message_pool.has_events():
|
||||
# allowing 30 seconds for stuff to come in...
|
||||
for _ in range(600):
|
||||
time.sleep(1)
|
||||
print("checking pool....")
|
||||
try:
|
||||
event_msg = relay_manager.message_pool.get_event()
|
||||
print(f"event_msg: pubkey: {event_msg.event.public_key} created_at {event_msg.event.created_at} with content '{event_msg.event.content}' and kind {event_msg.event.kind} and tags {event_msg.event.tags}")
|
||||
except Exception as e:
|
||||
#print(f"pool.get_event() got error: {e}")
|
||||
pass
|
||||
print("30 seconds passed, closing:")
|
||||
relay_manager.close_connections()
|
||||
|
||||
# new thread so REPL stays available
|
||||
# 12KB crashes here:
|
||||
# opening connections
|
||||
# [DEBUG 408724546] Starting run_forever
|
||||
# [DEBUG 408724546] Starting _async_main
|
||||
# [DEBUG 408724546] Reconnect interval set to 0s
|
||||
# [DEBUG 408724546] Started callback processing task
|
||||
# [DEBUG 408724546] Main loop iteration: self.running=True
|
||||
# [DEBUG 408724546] Connecting to wss://relay.damus.io
|
||||
# [DEBUG 408724547] Using SSL with no certificate verification
|
||||
# 24KB is fine
|
||||
# somehow, if I run this in a thread, I get: can't create thread" at File "/lib/nostr/relay_manager.py", line 48, in open_connections
|
||||
# tried stack sizes from 18KB up to 32KB
|
||||
#_thread.stack_size(16*1024)
|
||||
#_thread.start_new_thread(printevents, ())
|
||||
printevents()
|
||||
|
||||
|
||||
#import gc
|
||||
#for _ in range(50):
|
||||
# collect = gc.collect()
|
||||
# print(f"MEMFREE: {gc.mem_free()}")
|
||||
# time.sleep(1)
|
||||
@@ -0,0 +1,214 @@
|
||||
import json
|
||||
import ssl
|
||||
import time
|
||||
import sys
|
||||
from nostr.relay_manager import RelayManager
|
||||
from nostr.message_type import ClientMessageType
|
||||
from nostr.filter import Filter, Filters
|
||||
from nostr.event import EncryptedDirectMessage
|
||||
from nostr.key import PrivateKey
|
||||
|
||||
def parse_nwc_url(nwc_url):
|
||||
"""Parse Nostr Wallet Connect URL to extract pubkey, relay, secret, and lud16."""
|
||||
print(f"DEBUG: Starting to parse NWC URL: {nwc_url}")
|
||||
try:
|
||||
# Remove 'nostr+walletconnect://' or 'nwc:' prefix
|
||||
if nwc_url.startswith('nostr+walletconnect://'):
|
||||
print(f"DEBUG: Removing 'nostr+walletconnect://' prefix")
|
||||
nwc_url = nwc_url[22:]
|
||||
elif nwc_url.startswith('nwc:'):
|
||||
print(f"DEBUG: Removing 'nwc:' prefix")
|
||||
nwc_url = nwc_url[4:]
|
||||
else:
|
||||
print(f"DEBUG: No recognized prefix found in URL")
|
||||
raise ValueError("Invalid NWC URL: missing 'nostr+walletconnect://' or 'nwc:' prefix")
|
||||
|
||||
print(f"DEBUG: URL after prefix removal: {nwc_url}")
|
||||
|
||||
# Split into pubkey and query params
|
||||
parts = nwc_url.split('?')
|
||||
pubkey = parts[0]
|
||||
print(f"DEBUG: Extracted pubkey: {pubkey}")
|
||||
|
||||
# Validate pubkey (should be 64 hex characters)
|
||||
if len(pubkey) != 64 or not all(c in '0123456789abcdef' for c in pubkey):
|
||||
raise ValueError("Invalid NWC URL: pubkey must be 64 hex characters")
|
||||
|
||||
# Extract relay, secret, and lud16 from query params
|
||||
relay = None
|
||||
secret = None
|
||||
lud16 = None
|
||||
if len(parts) > 1:
|
||||
print(f"DEBUG: Query parameters found: {parts[1]}")
|
||||
params = parts[1].split('&')
|
||||
for param in params:
|
||||
if param.startswith('relay='):
|
||||
relay = param[6:]
|
||||
print(f"DEBUG: Extracted relay: {relay}")
|
||||
elif param.startswith('secret='):
|
||||
secret = param[7:]
|
||||
print(f"DEBUG: Extracted secret: {secret}")
|
||||
elif param.startswith('lud16='):
|
||||
lud16 = param[6:]
|
||||
print(f"DEBUG: Extracted lud16: {lud16}")
|
||||
else:
|
||||
print(f"DEBUG: No query parameters found")
|
||||
|
||||
if not pubkey or not relay or not secret:
|
||||
raise ValueError("Invalid NWC URL: missing required fields (pubkey, relay, or secret)")
|
||||
|
||||
# Validate secret (should be 64 hex characters)
|
||||
if len(secret) != 64 or not all(c in '0123456789abcdef' for c in secret):
|
||||
raise ValueError("Invalid NWC URL: secret must be 64 hex characters")
|
||||
|
||||
return {
|
||||
'relay': relay,
|
||||
'pubkey': pubkey,
|
||||
'secret': secret,
|
||||
'lud16': lud16
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"DEBUG: Error parsing NWC URL: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def get_balance(nwc_url):
|
||||
"""Get balance using Nostr Wallet Connect."""
|
||||
print(f"DEBUG: Starting get_balance with NWC URL: {nwc_url}")
|
||||
# Parse NWC URL
|
||||
nwc_data = parse_nwc_url(nwc_url)
|
||||
relay = nwc_data['relay']
|
||||
wallet_pubkey = nwc_data['pubkey']
|
||||
secret = nwc_data['secret']
|
||||
lud16 = nwc_data['lud16']
|
||||
print(f"DEBUG: Parsed NWC data - Relay: {relay}, Pubkey: {wallet_pubkey}, Secret: {secret}, lud16: {lud16}")
|
||||
|
||||
# Initialize private key from secret (assuming it's a hex key)
|
||||
try:
|
||||
#print(f"DEBUG: Initializing private key from secret")
|
||||
private_key = PrivateKey(bytes.fromhex(secret))
|
||||
print(f"DEBUG: Private key initialized, public key: {private_key.public_key.hex()}")
|
||||
except Exception as e:
|
||||
print(f"DEBUG: Error initializing private key: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Create get_balance request
|
||||
balance_request = {
|
||||
"method": "get_balance",
|
||||
"params": {}
|
||||
}
|
||||
print(f"DEBUG: Created balance request: {balance_request}")
|
||||
|
||||
#balance_request_string = json.dumps(balance_request)
|
||||
#encrypted = private_key.encrypt_message(balance_request_string, wallet_pubkey)
|
||||
#print(f"\n\n\nencryption returned: {encrypted}")
|
||||
#decrypted = private_key.decrypt_message(encrypted, wallet_pubkey)
|
||||
#print(f"\n\n\ndecryption returned: {decrypted}\n\n\n")
|
||||
|
||||
#decrypted = private_key.decrypt_message(encrypted, wallet_pubkey)
|
||||
#print(f"\n\n\ndecryption from nak returned: {decrypted}\n\n\n")
|
||||
# padding error
|
||||
|
||||
|
||||
# Create encrypted DM with the balance request
|
||||
print(f"DEBUG: Creating encrypted DM to wallet pubkey: {wallet_pubkey}")
|
||||
dm = EncryptedDirectMessage(
|
||||
recipient_pubkey=wallet_pubkey,
|
||||
cleartext_content=json.dumps(balance_request)
|
||||
)
|
||||
|
||||
|
||||
print(f"DEBUG: Signing DM {json.dumps(dm)} with private key")
|
||||
private_key.sign_event(dm) # sign also does encryption if it's a encrypted dm
|
||||
print(f"DEBUG: DM created with ID: {dm.id}")
|
||||
|
||||
# Set up relay manager
|
||||
print(f"DEBUG: Setting up relay manager with relay: {relay}")
|
||||
relay_manager = RelayManager()
|
||||
relay_manager.add_relay(relay)
|
||||
print(f"DEBUG: Opening relay connections")
|
||||
relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE})
|
||||
time.sleep(2) # Allow connections to open
|
||||
|
||||
# Check for relay connection notices
|
||||
print(f"DEBUG: Checking for relay notices")
|
||||
while relay_manager.message_pool.has_notices():
|
||||
notice = relay_manager.message_pool.get_notice()
|
||||
print(f"DEBUG: Relay notice: {notice.content}")
|
||||
|
||||
# Set up subscription to receive response
|
||||
subscription_id = "nwc_balance_" + str(round(time.time()))
|
||||
#print(f"DEBUG: Setting up subscription with ID: {subscription_id}")
|
||||
filters = Filters([Filter(
|
||||
kinds=[23195], # NWC replies
|
||||
authors=[wallet_pubkey],
|
||||
pubkey_refs=[private_key.public_key.hex()]
|
||||
)])
|
||||
#print(f"DEBUG: Subscription filters: {filters.to_json_array()}")
|
||||
relay_manager.add_subscription(subscription_id, filters)
|
||||
|
||||
# Publish request
|
||||
print(f"DEBUG: Publishing subscription request")
|
||||
request_message = [ClientMessageType.REQUEST, subscription_id]
|
||||
request_message.extend(filters.to_json_array())
|
||||
relay_manager.publish_message(json.dumps(request_message))
|
||||
print(f"DEBUG: Publishing encrypted DM")
|
||||
relay_manager.publish_event(dm)
|
||||
# only accept events after the time it was published
|
||||
after_time = time.time()
|
||||
if sys.platform == "esp32":
|
||||
# on esp32, it needs this correction:
|
||||
after_time += 946684800
|
||||
after_time -= 60 # go back a bit because server clocks might be drifting
|
||||
print(f"will only consider events after {after_time}")
|
||||
|
||||
|
||||
# Wait for response
|
||||
print(f"DEBUG: Waiting for response...")
|
||||
print(f"starting at {time.localtime()}")
|
||||
start_time = time.time()
|
||||
balance = None
|
||||
while time.time() - start_time < 60 * 2:
|
||||
while relay_manager.message_pool.has_events():
|
||||
print(f"DEBUG: Event received from message pool")
|
||||
event_msg = relay_manager.message_pool.get_event()
|
||||
event_created_at = event_msg.event.created_at
|
||||
print(f"Received at {time.localtime()} a message with timestamp {event_created_at}")
|
||||
if event_created_at < after_time:
|
||||
print("Skipping event because it's too old!")
|
||||
continue
|
||||
#print(f"event_msg content {event_msg.event.content}")
|
||||
try:
|
||||
#print(f"DEBUG: Decrypting event from public_key: {event_msg.event.public_key}")
|
||||
decrypted_content = private_key.decrypt_message(
|
||||
event_msg.event.content,
|
||||
event_msg.event.public_key
|
||||
)
|
||||
print(f"DEBUG: Decrypted content: {decrypted_content}")
|
||||
response = json.loads(decrypted_content)
|
||||
print(f"DEBUG: Parsed response: {response}")
|
||||
if response.get("method") == "get_balance":
|
||||
balance = response.get("result", {}).get("balance")
|
||||
print(f"DEBUG: Balance found: {balance} satoshis")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"DEBUG: Error processing response: {e}")
|
||||
if balance is not None:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
print(f"finished at {time.localtime()}")
|
||||
|
||||
# Close connections
|
||||
print(f"DEBUG: Closing relay connections")
|
||||
relay_manager.close_connections()
|
||||
|
||||
if balance is not None:
|
||||
print(f"Balance: {balance} satoshis")
|
||||
else:
|
||||
print("No balance response received or request timed out")
|
||||
|
||||
# Example usage
|
||||
#nwc_url = "nostr+walletconnect://b889ff5b1513b641e2a139f661a661364979c5beee91842f8f0ef42ab558e9d4?relay=wss%3A%2F%2Frelay.getalby.com/v1&secret=71a8c14c1407c113601079c4302dab36460f0ccd0ad506f1f2dc73b5100e4f3c&lud16=moritz@getalby.com"
|
||||
#nwc_url = "nostr+walletconnect://b889ff5b1513b641e2a139f661a661364979c5beee91842f8f0ef42ab558e9d4?relay=wss://relay.getalby.com/v1&secret=71a8c14c1407c113601079c4302dab36460f0ccd0ad506f1f2dc73b5100e4f3c&lud16=moritz@getalby.com"
|
||||
print(f"Processing NWC URL: {nwc_url}")
|
||||
get_balance(nwc_url)
|
||||
@@ -0,0 +1,5 @@
|
||||
p = lv.ffmpeg_player(lv.screen_active())
|
||||
p.player_set_src("../artwork/Big_Buck_Bunny_extract.ogv")
|
||||
p.player_set_auto_restart(True)
|
||||
p.player_set_cmd(p.PLAYER_CMD.START)
|
||||
p.center()
|
||||
Reference in New Issue
Block a user