OSUpdate: comments

This commit is contained in:
Thomas Farstrike
2025-06-14 11:54:38 +02:00
parent 7648897ef0
commit eaecd7c0e4
51 changed files with 1 additions and 5055 deletions
-69
View File
@@ -1,69 +0,0 @@
import uasyncio as asyncio
import sys
from contextlib import suppress
import aiohttp
import _thread
# Shared buffer for input lines
input_buffer = []
lock = _thread.allocate_lock() # Thread-safe access to buffer
# Thread function to read input and add to buffer
def read_input_thread():
while True:
line = input()
with lock:
input_buffer.append(line)
if line == "exit":
break
async def start_client(url: str) -> None:
name = input("Please enter your name: ")
# Start the input reading thread
_thread.start_new_thread(read_input_thread, ())
async def dispatch(ws: aiohttp.ClientWebSocketResponse) -> None:
while True:
#msg = await ws.receive()
msg = await ws.__anext__()
if msg.type is aiohttp.WSMsgType.TEXT:
print("Text: ", msg.data.strip())
elif msg.type is aiohttp.WSMsgType.BINARY:
print("Binary: ", msg.data)
elif msg.type is aiohttp.WSMsgType.PING:
await ws.pong()
elif msg.type is aiohttp.WSMsgType.PONG:
print("Pong received")
else:
if msg.type is aiohttp.WSMsgType.CLOSE:
await ws.close()
elif msg.type is aiohttp.WSMsgType.ERROR:
print("Error during receive %s" % ws.exception())
elif msg.type is aiohttp.WSMsgType.CLOSED:
pass
break
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
dispatch_task = asyncio.create_task(dispatch(ws))
# Poll the input buffer instead of to_thread
while True:
line = None
with lock:
if input_buffer: # Check if there's input
line = input_buffer.pop(0) # Get the first line
if line:
await ws.send_str(name + ": " + line)
if line == "exit": # Stop on "exit"
break
await asyncio.sleep_ms(100) # Avoid busy-waiting
dispatch_task.cancel()
with suppress(asyncio.CancelledError):
await dispatch_task
# Run the client
asyncio.run(start_client("wss://echo.websocket.events"))
-304
View File
@@ -1,304 +0,0 @@
# Initialize
import display_driver
import lvgl as lv
import lvgl
label = lvgl.label( lvgl.screen_active() )
label.set_text( 'hi' )
animation = lvgl.anim_t()
animation.init()
animation.set_var( label )
animation.set_values( 0, 100 )
animation.set_time( 1000 )
animation.set_custom_exec_cb( lambda not_used, value : label.set_x( value ))
#wait half a second before starting animation
animation.set_delay( 500 )
#play animation backward for 1 second after first play
animation.set_playback_time( 1000 )
#repeat animation infinitely
animation.set_repeat_count( 10 )
#animation.set_repeat_count( lvgl.ANIM_REPEAT.INFINITE )
animation.set_repeat_delay( 500 )
animation.start()
******************************
# Initialize
import display_driver
import lvgl as lv
import lvgl
#create the slow short label
label1 = lvgl.label( lvgl.screen_active() )
label1.set_text( 'hello 1' )
label1.align( lvgl.ALIGN.CENTER, 0, -50 )
anim1 = lvgl.anim_t()
anim1.init()
anim1.set_var( label1 )
#anim1.set_time( lvgl.anim_speed_to_time( 20, 0, 100 ))
anim1.set_time( 2000 )
anim1.set_values( 0, 100 )
anim1.set_repeat_count( 10 )
anim1.set_repeat_delay( 2000 )
anim1.set_custom_exec_cb( lambda not_used, value : label1.set_x( value ))
#create the fast long label
label2 = lvgl.label( lvgl.screen_active() )
label2.set_text('hello 2')
label2.align(lvgl.ALIGN.CENTER,-100,0)
anim2 = lvgl.anim_t()
anim2.init()
anim2.set_var( label2 )
#anim2.set_time( lvgl.anim_speed_to_time( 40, -100, 100 ))
anim2.set_time( 40 * 200 )
anim2.set_values( -100, 100 )
anim2.set_custom_exec_cb( lambda not_used, value : label2.set_x( value ))
anim2.set_repeat_count( 10)
anim2.set_repeat_delay( 2000 )
#Create the fast short label
label3 = lvgl.label( lvgl.screen_active() )
label3.set_text( 'hello 3' )
label3.align( lvgl.ALIGN.CENTER, -100, 50 )
anim3 = lvgl.anim_t()
anim3.init()
anim3.set_var( label3 )
#anim3.set_time( lvgl.anim_speed_to_time( 40, -100, 0 ))
anim3.set_time( 40 * 100)
anim3.set_values( -100, 0)
anim3.set_custom_exec_cb( lambda not_used, value : label3.set_x( value ))
anim3.set_repeat_count( 10 )
anim3.set_repeat_delay( 40 * 100 + 2000 )
#anim3.set_repeat_delay( lvgl.anim_speed_to_time( 40, -100, 0) + 2000 )
#lvgl.anim_speed_to_time( 1, 2, 3)
#anim3.set_repeat_delay( 5000)
#lvgl.anim
anim1.start()
anim2.start()
anim3.start()
******************
# Initialize
import display_driver
import lvgl as lv
import lvgl
import lvgl
#normal animation
label1 = lvgl.label( lvgl.screen_active() )
label1.set_text( 'hello 1' )
label1.align( lvgl.ALIGN.CENTER, -70, -60 )
anim1 = lvgl.anim_t()
anim1.init()
anim1.set_var( label1 )
anim1.set_time( 1000 )
anim1.set_values( -70, 20 )
anim1.set_repeat_count( 100 )
anim1.set_repeat_delay( 2000 )
anim1.set_custom_exec_cb( lambda not_used, value : label1.set_x( value ))
#this animation bounces the label when it ends
label2 = lvgl.label( lvgl.screen_active() )
label2.set_text( 'hello 2' )
label2.align( lvgl.ALIGN.CENTER, 30, -60 )
anim2 = lvgl.anim_t()
anim2.init()
anim2.set_var( label2 )
anim2.set_time( 1000 )
anim2.set_values( 30, 120 )
anim2.set_custom_exec_cb( lambda not_used, value : label2.set_x( value ))
anim2.set_repeat_count( 100)
anim2.set_repeat_delay( 2000 )
anim2.set_path_cb( lvgl.anim_t.path_bounce )
#this animation goes past the end point then comes back
label3 = lvgl.label( lvgl.screen_active() )
label3.set_text( 'hello 3' )
label3.align( lvgl.ALIGN.CENTER, -70, 60 )
anim3 = lvgl.anim_t()
anim3.init()
anim3.set_var( label3 )
anim3.set_time( 1000 )
anim3.set_values( -70, 20 )
anim3.set_custom_exec_cb( lambda not_used, value : label3.set_x( value ))
anim3.set_repeat_count( 100 )
anim3.set_repeat_delay( 2000 )
anim3.set_path_cb( lvgl.anim_t.path_overshoot )
#this animation slowly starts and then slowly ends
label4 = lvgl.label( lvgl.screen_active() )
label4.set_text( 'hello 4' )
label4.align( lvgl.ALIGN.CENTER, 30, 60 )
anim4 = lvgl.anim_t()
anim4.init()
anim4.set_var( label4 )
anim4.set_time( 1000 )
anim4.set_values( 30, 120 )
anim4.set_custom_exec_cb( lambda not_used, value : label4.set_x( value ))
anim4.set_repeat_count( 100 )
anim4.set_repeat_delay( 2000 )
anim4.set_path_cb( lvgl.anim_t.path_ease_in_out )
anim1.start()
anim2.start()
anim3.start()
anim4.start()
********************
# Initialize
import display_driver
import lvgl as lv
button = lv.button( lv.screen_active() )
button.set_size( 50, 20 )
button.center()
anim1 = lv.anim_t()
anim1.init()
anim1.set_var( button )
anim1.set_time( 1000 )
anim1.set_values( -100, 100 )
anim1.set_custom_exec_cb( lambda not_used, value : button.set_x( value ))
anim2 = lv.anim_t()
anim2.init()
anim2.set_var( button )
anim2.set_time( 150 )
anim2.set_values( 100, 30 )
anim2.set_custom_exec_cb( lambda not_used, value : button.set_x( value ))
anim3 = lv.anim_t()
anim3.init()
anim3.set_var( button )
anim3.set_time( 2000 )
anim3.set_values( 30, -100 )
anim3.set_custom_exec_cb( lambda not_used, value : button.set_x( value ))
timeline = lv.anim_timeline_create()
# this works:
lv.anim_timeline_t.add(timeline,0,anim1)
lv.anim_timeline_t.add(timeline,2000,anim2)
lv.anim_timeline_t.add(timeline,3150,anim3)
lv.anim_timeline_t.start(timeline)
# to restart it:
#lv.anim_timeline_t.set_progress(timeline, 0)
#lv.anim_timeline_t.start(timeline)
# or to reverse it:
#lv.anim_timeline_t.set_reverse(timeline,True)
#lv.anim_timeline_t.start(timeline)
#lv.anim_timeline_t.set_reverse(timeline,False)
#lv.anim_timeline_t.start(timeline)
# progress max is 65535, not 32k
#lv.anim_timeline_t.set_progress(timeline,65535)
#lv.anim_timeline_t.start(timeline)
***************** clicking the button starts the animation:
# Initialize
import display_driver
import lvgl as lv
label = lv.label( lv.screen_active() )
label.set_text( 'hi' )
animation = lv.anim_t()
animation.init()
animation.set_var( label )
animation.set_values( 0, 100 )
animation.set_time( 1000 )
animation.set_custom_exec_cb( lambda not_used, value : label.set_x( value ))
#wait half a second before starting animation
#animation.set_delay( 500 )
#play animation backward for 1 second after first play
#animation.set_playback_time( 1000 )
#repeat animation infinitely
#animation.set_repeat_count( 10 )
#animation.set_repeat_count( lvgl.ANIM_REPEAT.INFINITE )
#animation.set_repeat_delay( 500 )
button = lv.button(lv.screen_active())
button.set_size(60,30)
button.center()
#button.set_pos(100,100)
wifi_label=lv.label(button)
wifi_label.set_text(lv.SYMBOL.WIFI+" WiFi")
wifi_label.center()
#wifi_label.set_style_text_color(COLOR_DRAWER_BUTTONTEXT,0)
def wifi_event(e):
animation.start()
button.add_event_cb(wifi_event,lv.EVENT.CLICKED,None)
@@ -1,449 +0,0 @@
import lvgl as lv
import uio
import ujson
import uos
import _thread
import traceback
import mpos.info
import mpos.ui
def good_stack_size():
stacksize = 24*1024
import sys
if sys.platform == "esp32":
stacksize = 16*1024
return stacksize
# Run the script in the current thread:
def execute_script(script_source, is_file, cwd=None, classname=None):
thread_id = _thread.get_ident()
compile_name = 'script' if not is_file else script_source
print(f"Thread {thread_id}: executing script with cwd: {cwd}")
script_globals = {'lv': lv, '__name__': "__main__"}
import sys
import uos
import utime
path_before = sys.path
if cwd:
sys.path.append(cwd)
try:
if is_file:
mpy_file = script_source.rsplit('.py', 1)[0] + '.mpy' if '.py' in script_source else script_source + '.mpy'
try:
uos.stat(mpy_file)
source_file = mpy_file
except OSError:
source_file = script_source
mode = 'rb' if source_file.endswith('.mpy') else 'r'
print(f"Thread {thread_id}: reading {'bytecode' if mode == 'rb' else 'script'} from {source_file}")
start_time = utime.ticks_ms()
f = open(source_file, mode)
try:
script_source = f.read()
finally:
f.close()
read_time = utime.ticks_diff(utime.ticks_ms(), start_time)
print(f"Thread {thread_id}: file read took {read_time} ms")
try:
start_time = utime.ticks_ms()
compiled_script = script_source if isinstance(script_source, bytes) else compile(script_source, compile_name, 'exec')
compile_time = utime.ticks_diff(utime.ticks_ms(), start_time)
if not isinstance(script_source, bytes):
print(f"Thread {thread_id}: compilation took {compile_time} ms")
exec(compiled_script, script_globals)
if classname:
main_activity = script_globals.get(classname)
if main_activity:
Activity.startActivity(None, Intent(activity_class=main_activity))
else:
print("Warning: could not find main_activity")
except Exception as e:
print(f"Thread {thread_id}: exception during execution:")
traceback.print_exception(type(e), e, getattr(e, '__traceback__', None))
print(f"Thread {thread_id}: script {compile_name} finished")
except Exception as e:
print(f"Thread {thread_id}: error:")
traceback.print_exception(type(e), e, getattr(e, '__traceback__', None))
sys.path = path_before
# Run the script in a new thread:
# TODO: check if the script exists here instead of launching a new thread?
def execute_script_new_thread(scriptname, is_file):
print(f"main.py: execute_script_new_thread({scriptname},{is_file})")
try:
# 168KB maximum at startup but 136KB after loading display, drivers, LVGL gui etc so let's go for 128KB for now, still a lot...
# But then no additional threads can be created. A stacksize of 32KB allows for 4 threads, so 3 in the app itself, which might be tight.
# 16KB allows for 10 threads in the apps, but seems too tight for urequests on unix (desktop) targets
# 32KB seems better for the camera, but it forced me to lower other app threads from 16 to 12KB
#_thread.stack_size(24576) # causes camera issue...
# NOTE: This doesn't do anything if apps are started in the same thread!
if "camtest" in scriptname:
print("Starting camtest with extra stack size!")
stack=32*1024
elif "appstore"in scriptname:
print("Starting appstore with extra stack size!")
stack=24*1024 # this doesn't do anything because it's all started in the same thread
else:
stack=16*1024 # 16KB doesn't seem to be enough for the AppStore app on desktop
stack = mpos.apps.good_stack_size()
print(f"app.py: setting stack size for script to {stack}")
_thread.stack_size(stack)
_thread.start_new_thread(execute_script, (scriptname, is_file))
except Exception as e:
print("main.py: execute_script_new_thread(): error starting new thread thread: ", e)
def start_app_by_name(app_name, is_launcher=False):
mpos.ui.set_foreground_app(app_name)
custom_app_dir=f"apps/{app_name}"
builtin_app_dir=f"builtin/apps/{app_name}"
try:
stat = uos.stat(custom_app_dir)
start_app(custom_app_dir, is_launcher)
except OSError:
start_app(builtin_app_dir, is_launcher)
def start_app(app_dir, is_launcher=False):
print(f"main.py start_app({app_dir},{is_launcher})")
mpos.ui.set_foreground_app(app_dir) # would be better to store only the app name...
manifest_path = f"{app_dir}/META-INF/MANIFEST.JSON"
app = mpos.apps.parse_manifest(manifest_path)
print(f"start_app parsed manifest and got: {str(app)}")
main_launcher_activity = find_main_launcher_activity(app)
if not main_launcher_activity:
print(f"WARNING: can't start {app_dir} because no main_launcher_activity was found.")
return
start_script_fullpath = f"{app_dir}/{main_launcher_activity.get('entrypoint')}"
execute_script(start_script_fullpath, True, app_dir + "/assets/", main_launcher_activity.get("classname"))
# Launchers have the bar, other apps don't have it
if is_launcher:
mpos.ui.open_bar()
else:
mpos.ui.close_bar()
def restart_launcher():
mpos.ui.empty_screen_stack()
# No need to stop the other launcher first, because it exits after building the screen
start_app_by_name("com.micropythonos.launcher", True) # Would be better to query the PackageManager for Activities that are launchers
def find_main_launcher_activity(app):
result = None
for activity in app.activities:
if not activity.get("entrypoint") or not activity.get("classname"):
print(f"Warning: activity {activity} has no entrypoint and classname, skipping...")
continue
print("checking activity's intent_filters...")
for intent_filter in activity.get("intent_filters"):
print("checking intent_filter...")
if intent_filter.get("action") == "main" and intent_filter.get("category") == "launcher":
print("found main_launcher!")
result = activity
break
return result
def is_launcher(app_name):
print(f"checking is_launcher for {app_name}")
# Simple check, could be more elaborate by checking the MANIFEST.JSON for the app...
return "launcher" in app_name
class App:
def __init__(self, name, publisher, short_description, long_description, icon_url, download_url, fullname, version, category, activities):
self.name = name
self.publisher = publisher
self.short_description = short_description
self.long_description = long_description
self.icon_url = icon_url
self.download_url = download_url
self.fullname = fullname
self.version = version
self.category = category
self.image = None
self.image_dsc = None
self.activities = activities
def __str__(self):
return (f"App(name='{self.name}', "
f"publisher='{self.publisher}', "
f"short_description='{self.short_description}', "
f"version='{self.version}', "
f"category='{self.category}', "
f"activities={self.activities})")
def parse_manifest(manifest_path):
# Default values for App object
default_app = App(
name="Unknown",
publisher="Unknown",
short_description="",
long_description="",
icon_url="",
download_url="",
fullname="Unknown",
version="0.0.0",
category="",
activities=[]
)
try:
with open(manifest_path, 'r') as f:
app_info = ujson.load(f)
#print(f"parsed app: {app_info}")
# Create App object with values from manifest, falling back to defaults
return App(
name=app_info.get("name", default_app.name),
publisher=app_info.get("publisher", default_app.publisher),
short_description=app_info.get("short_description", default_app.short_description),
long_description=app_info.get("long_description", default_app.long_description),
icon_url=app_info.get("icon_url", default_app.icon_url),
download_url=app_info.get("download_url", default_app.download_url),
fullname=app_info.get("fullname", default_app.fullname),
version=app_info.get("version", default_app.version),
category=app_info.get("category", default_app.category),
activities=app_info.get("activities", default_app.activities)
)
except OSError:
print(f"parse_manifest: error loading manifest_path: {manifest_path}")
return default_app
def auto_connect():
builtin_auto_connect = "builtin/system/WifiService.py"
try:
print(f"Starting {builtin_auto_connect}...")
stat = uos.stat(builtin_auto_connect)
execute_script_new_thread(builtin_auto_connect, True)
except Exception as e:
print("Couldn't execute {builtin_auto_connect} because exception {e}, continuing...")
class Activity:
def __init__(self):
self.intent = None # Store the intent that launched this activity
self.result = None
self._result_callback = None
def onCreate(self):
pass
def onStart(self, screen):
pass
def onResume(self, screen):
pass
def onPause(self, screen):
pass
def onStop(self, screen):
pass
def onDestroy(self, screen):
pass
def setContentView(self, screen):
mpos.ui.setContentView(self, screen)
def startActivity(self, intent):
ActivityNavigator.startActivity(intent)
def startActivityForResult(self, intent, result_callback):
ActivityNavigator.startActivityForResult(intent, result_callback)
def initError(self, e):
print(f"WARNING: You might have inherited from Activity with a custom __init__() without calling super().__init__(). Got AttributeError: {e}")
def getIntent(self):
try:
return self.intent
except AttributeError as e:
self.initError(e)
def setResult(self, result_code, data=None):
"""Set the result to be returned when the activity finishes."""
try:
self.result = {"result_code": result_code, "data": data or {}}
except AttributeError as e:
self.initError(e)
def finish(self):
mpos.ui.back_screen()
try:
if self._result_callback and self.result:
self._result_callback(self.result)
self._result_callback = None # Clean up
except AttributeError as e:
self.initError(e)
class Intent:
def __init__(self, activity_class=None, action=None, data=None, extras=None):
self.activity_class = activity_class # Explicit target (e.g., SettingsActivity)
self.action = action # Action string (e.g., "view", "share")
self.data = data # Single data item (e.g., URL)
self.extras = extras or {} # Dictionary for additional data
self.flags = {} # Simplified flags: {"clear_top": bool, "no_history": bool, "no_animation": bool}
def addFlag(self, flag, value=True):
self.flags[flag] = value
return self
def putExtra(self, key, value):
self.extras[key] = value
return self
class ActivityNavigator:
@staticmethod
def startActivity(intent):
if not isinstance(intent, Intent):
raise ValueError("Must provide an Intent")
if intent.action: # Implicit intent: resolve handlers
handlers = APP_REGISTRY.get(intent.action, [])
if len(handlers) == 1:
intent.activity_class = handlers[0]
ActivityNavigator._launch_activity(intent)
elif handlers:
ActivityNavigator._show_chooser(intent, handlers)
else:
raise ValueError(f"No handlers for action: {intent.action}")
else:
ActivityNavigator._launch_activity(intent)
@staticmethod
def startActivityForResult(intent, result_callback):
"""Launch an activity and pass a callback for the result."""
if not isinstance(intent, Intent):
raise ValueError("Must provide an Intent")
if intent.action: # Implicit intent: resolve handlers
handlers = APP_REGISTRY.get(intent.action, [])
if len(handlers) == 1:
intent.activity_class = handlers[0]
return ActivityNavigator._launch_activity(intent, result_callback)
elif handlers:
ActivityNavigator._show_chooser(intent, handlers)
return None # Chooser handles result forwarding
else:
raise ValueError(f"No handlers for action: {intent.action}")
else:
return ActivityNavigator._launch_activity(intent, result_callback)
@staticmethod
def _launch_activity(intent, result_callback=None):
"""Launch an activity and set up result callback."""
activity = intent.activity_class()
activity.intent = intent
activity._result_callback = result_callback # Pass callback to activity
activity.onCreate()
return activity
@staticmethod
def _show_chooser(intent, handlers):
chooser_intent = Intent(ChooserActivity, extras={"original_intent": intent, "handlers": [h.__name__ for h in handlers]})
ActivityNavigator._launch_activity(chooser_intent)
class ChooserActivity(Activity):
def __init__(self):
super().__init__()
def onCreate(self):
screen = lv.obj()
# Get handlers from intent extras
original_intent = self.getIntent().extras.get("original_intent")
handlers = self.getIntent().extras.get("handlers", [])
label = lv.label(screen)
label.set_text("Choose an app")
label.set_pos(10, 10)
for i, handler_name in enumerate(handlers):
btn = lv.btn(screen)
btn.set_user_data(f"handler_{i}")
btn_label = lv.label(btn)
btn_label.set_text(handler_name)
btn.set_pos(10, 50 * (i + 1) + 10)
btn.add_event_cb(lambda e, h=handler_name, oi=original_intent: self._select_handler(h, oi), lv.EVENT.CLICKED)
self.setContentView(screen)
def _select_handler(self, handler_name, original_intent):
for handler in APP_REGISTRY.get(original_intent.action, []):
if handler.__name__ == handler_name:
original_intent.activity_class = handler
navigator.startActivity(original_intent)
break
navigator.finish() # Close chooser
def onStop(self, screen):
if self.getIntent() and self.getIntent().getStringExtra("destination") == "ChooserActivity":
print("Stopped for Chooser")
else:
print("Stopped for other screen")
class ViewActivity(Activity):
def __init__(self):
super().__init__()
def onCreate(self):
screen = lv.obj()
# Get content from intent (prefer extras.url, fallback to data)
content = self.getIntent().extras.get("url", self.getIntent().data or "No content")
label = lv.label(screen)
label.set_user_data("content_label")
label.set_text(f"Viewing: {content}")
label.center()
self.setContentView(screen)
def onStart(self, screen):
content = self.getIntent().extras.get("url", self.getIntent().data or "No content")
for i in range(screen.get_child_cnt()):
if screen.get_child(i).get_user_data() == "content_label":
screen.get_child(i).set_text(f"Viewing: {content}")
def onStop(self, screen):
if self.getIntent() and self.getIntent().getStringExtra("destination") == "ViewActivity":
print("Stopped for View")
else:
print("Stopped for other screen")
class ShareActivity(Activity):
def __init__(self):
super().__init__()
def onCreate(self):
screen = lv.obj()
# Get text from intent (prefer extras.text, fallback to data)
text = self.getIntent().extras.get("text", self.getIntent().data or "No text")
label = lv.label(screen)
label.set_user_data("share_label")
label.set_text(f"Share: {text}")
label.set_pos(10, 10)
btn = lv.btn(screen)
btn.set_user_data("share_btn")
btn_label = lv.label(btn)
btn_label.set_text("Share")
btn.set_pos(10, 50)
btn.add_event_cb(lambda e: self._share_content(text), lv.EVENT.CLICKED)
self.setContentView(screen)
def _share_content(self, text):
# Dispatch to another app (e.g., MessagingActivity) or simulate sharing
print(f"Sharing: {text}") # Placeholder for actual sharing
# Example: Launch another share handler
navigator.startActivity(Intent(action="share", data=text))
navigator.finish() # Close ShareActivity
def onStop(self, screen):
if self.getIntent() and self.getIntent().getStringExtra("destination") == "ShareActivity":
print("Stopped for Share")
else:
print("Stopped for other screen")
APP_REGISTRY = { # This should be handled by a new class PackageManager:
"view": [ViewActivity], # Hypothetical activities
"share": [ShareActivity]
}
-284
View File
@@ -1,284 +0,0 @@
import socket
import asyncio as a
import binascii as b
import random as r
from collections import namedtuple
import re
import struct
import ssl
# Opcodes
OP_CONT = const(0x0)
OP_TEXT = const(0x1)
OP_BYTES = const(0x2)
OP_CLOSE = const(0x8)
OP_PING = const(0x9)
OP_PONG = const(0xa)
# Close codes
CLOSE_OK = const(1000)
CLOSE_GOING_AWAY = const(1001)
CLOSE_PROTOCOL_ERROR = const(1002)
CLOSE_DATA_NOT_SUPPORTED = const(1003)
CLOSE_BAD_DATA = const(1007)
CLOSE_POLICY_VIOLATION = const(1008)
CLOSE_TOO_BIG = const(1009)
CLOSE_MISSING_EXTN = const(1010)
CLOSE_BAD_CONDITION = const(1011)
URL_RE = re.compile(r'(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?')
URI = namedtuple('URI', ('protocol', 'hostname', 'port', 'path'))
class AsyncWebsocketClient:
def __init__(self, ms_delay_for_read: int = 5):
self._open = False
self.delay_read = ms_delay_for_read
self._lock_for_open = a.Lock()
self.sock = None
async def open(self, new_val: bool = None):
await self._lock_for_open.acquire()
if new_val is not None:
if not new_val and self.sock:
self.sock.close()
self.sock = None
self._open = new_val
to_return = self._open
self._lock_for_open.release()
return to_return
async def close(self):
return await self.open(False)
def urlparse(self, uri):
"""Parse ws or wss:// URLs"""
match = URL_RE.match(uri)
if match:
protocol, host, port, path = match.group(1), match.group(2), match.group(3), match.group(4)
if protocol not in ['ws', 'wss']:
raise ValueError('Scheme {} is invalid'.format(protocol))
if port is None:
port = (80, 443)[protocol == 'wss']
return URI(protocol, host, int(port), path)
async def a_readline(self):
line = None
while line is None:
line = self.sock.readline()
await a.sleep_ms(self.delay_read)
return line
async def a_read(self, size: int = None):
if size == 0:
return b''
chunks = []
while True:
b = self.sock.read(size)
await a.sleep_ms(self.delay_read)
# Continue reading if the socket returns None
if b is None: continue
# In some cases, the socket will return an empty bytes
# after PING or PONG frames, we need to ignore them.
if len(b) == 0: break
chunks.append(b)
size -= len(b)
# After reading the first chunk, we can break if size is None or 0
if size is None or size == 0: break
# Join all the chunks and return them
return b''.join(chunks)
async def handshake(self, uri, headers=[], keyfile=None, certfile=None, cafile=None, cert_reqs=0):
if self.sock:
self.close()
self.sock = socket.socket()
self.uri = self.urlparse(uri)
ai = socket.getaddrinfo(self.uri.hostname, self.uri.port)
addr = ai[0][4]
self.sock.connect(addr)
self.sock.setblocking(False)
if self.uri.protocol == 'wss':
cadata = None
if not cafile is None:
with open(cafile, 'rb') as f:
cadata = f.read()
self.sock = ssl.wrap_socket(
self.sock, server_side=False,
key=keyfile, cert=certfile,
cert_reqs=cert_reqs, # 0 - NONE, 1 - OPTIONAL, 2 - REQUIED
cadata=cadata,
server_hostname=self.uri.hostname
)
def send_header(header, *args):
self.sock.write(header % args + '\r\n')
# Sec-WebSocket-Key is 16 bytes of random base64 encoded
key = b.b2a_base64(bytes(r.getrandbits(8)
for _ in range(16)))[:-1]
send_header(b'GET %s HTTP/1.1', self.uri.path or '/')
send_header(b'Host: %s:%s', self.uri.hostname, self.uri.port)
send_header(b'Connection: Upgrade')
send_header(b'Upgrade: websocket')
send_header(b'Sec-WebSocket-Key: %s', key)
send_header(b'Sec-WebSocket-Version: 13')
send_header(b'Origin: http://{hostname}:{port}'.format(
hostname=self.uri.hostname,
port=self.uri.port)
)
for key, value in headers:
send_header(b'%s: %s', key, value)
send_header(b'')
line = await self.a_readline()
header = (line)[:-2]
if not header.startswith(b'HTTP/1.1 101 '):
raise Exception(header)
# We don't (currently) need these headers
# FIXME: should we check the return key?
while header:
line = await self.a_readline()
header = (line)[:-2]
return await self.open(True)
async def read_frame(self, max_size=None):
# Frame header
byte1, byte2 = struct.unpack('!BB', await self.a_read(2))
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
fin = bool(byte1 & 0x80)
opcode = byte1 & 0x0f
# Byte 2: MASK(1) LENGTH(7)
mask = bool(byte2 & (1 << 7))
length = byte2 & 0x7f
if length == 126: # Magic number, length header is 2 bytes
length, = struct.unpack('!H', await self.a_read(2))
elif length == 127: # Magic number, length header is 8 bytes
length, = struct.unpack('!Q', await self.a_read(8))
if mask: # Mask is 4 bytes
mask_bits = await self.a_read(4)
try:
data = await self.a_read(length)
except MemoryError:
# We can't receive this many bytes, close the socket
self.close(code=CLOSE_TOO_BIG)
# await self._stream.drain()
return True, OP_CLOSE, None
if mask:
data = bytes(b ^ mask_bits[i % 4]
for i, b in enumerate(data))
return fin, opcode, data
def write_frame(self, opcode, data=b''):
fin = True
mask = True # messages sent by client are masked
length = len(data)
# Frame header
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
byte1 = 0x80 if fin else 0
byte1 |= opcode
# Byte 2: MASK(1) LENGTH(7)
byte2 = 0x80 if mask else 0
if length < 126: # 126 is magic value to use 2-byte length header
byte2 |= length
self.sock.write(struct.pack('!BB', byte1, byte2))
elif length < (1 << 16): # Length fits in 2-bytes
byte2 |= 126 # Magic code
self.sock.write(struct.pack('!BBH', byte1, byte2, length))
elif length < (1 << 64):
byte2 |= 127 # Magic code
self.sock.write(struct.pack('!BBQ', byte1, byte2, length))
else:
raise ValueError()
if mask: # Mask is 4 bytes
mask_bits = struct.pack('!I', r.getrandbits(32))
self.sock.write(mask_bits)
data = bytes(b ^ mask_bits[i % 4]
for i, b in enumerate(data))
self.sock.write(data)
async def recv(self):
while await self.open():
try:
fin, opcode, data = await self.read_frame()
# except (ValueError, EOFError) as ex:
except Exception as ex:
print('Exception in recv while reading frame:', ex)
await self.open(False)
return
if not fin:
raise NotImplementedError()
if opcode == OP_TEXT:
return data.decode('utf-8')
elif opcode == OP_BYTES:
return data
elif opcode == OP_CLOSE:
await self.open(False)
return
elif opcode == OP_PONG:
# Ignore this frame, keep waiting for a data frame
continue
elif opcode == OP_PING:
try:
# We need to send a pong frame
self.write_frame(OP_PONG, data)
# And then continue to wait for a data frame
continue
except Exception as ex:
print('Error sending pong frame:', ex)
# If sending the pong frame fails, close the connection
await self.open(False)
return
elif opcode == OP_CONT:
# This is a continuation of a previous frame
raise NotImplementedError(opcode)
else:
raise ValueError(opcode)
async def send(self, buf):
if not await self.open():
return
if isinstance(buf, str):
opcode = OP_TEXT
buf = buf.encode('utf-8')
elif isinstance(buf, bytes):
opcode = OP_BYTES
else:
raise TypeError()
self.write_frame(opcode, buf)
-9
View File
@@ -1,9 +0,0 @@
# Create a dropdown
dropdown = lv.dropdown(lv.screen_active())
dropdown.set_options("Option 1\nOption 2\nOption 3")
dropdown.align(lv.ALIGN.CENTER, 0, 0)
switch = lv.switch(lv.screen_active())
switch.center()
-54
View File
@@ -1,54 +0,0 @@
import binascii
import os
import sys
try:
input_file = sys.argv[1]
except IndexError:
print('No file or directory given using current working directory')
input_file = os.getcwd()
def run(in_file):
output_file = os.path.splitext(in_file)[0] + '.py'
ext = os.path.splitext(in_file)[1][1:]
with open(in_file, 'rb') as f:
data = f.read()
data = binascii.hexlify(data)
data = [' ' + str(data[i: min(i + 74, len(data))])[1:] for i in range(0, len(data), 74)]
data = '\n'.join(data)
output = f'''\
import binascii
_{ext} = bytearray(binascii.unhexlify(
{data}
))
{ext} = memoryview(_{ext})
'''
with open(output_file, 'w') as f:
f.write(output)
def process_directory(directory):
for root, _, files in os.walk(directory):
for file in files:
file_ext = os.path.splitext(file)[1][1:]
if file_ext not in ('bmp', 'jpg', 'gif', 'png', 'bin', 'MF'):
continue
thisfile = os.path.join(root, file)
print('found file:', thisfile)
run(thisfile)
if os.path.isdir(input_file):
process_directory(input_file)
else:
file_ext = os.path.splitext(input_file)[1][1:]
if file_ext not in ('bmp', 'jpg', 'gif', 'png', 'bin'):
raise RuntimeError('supported image files are bmp, jpg, gif and png')
print('found file:', input_file)
run(input_file)
print('DONE!!')
-39
View File
@@ -1,39 +0,0 @@
import lvgl as lv
def list_image_decoders():
# Initialize LVGL
lv.init()
# Start with the first decoder
first = lv.image_decoder_t()
#decoder = lv.image.decoder_get_next(first)
decoder = lv.image_decoder_t.get_next(first)
index = 0
# Iterate through all decoders
while decoder is not None:
print(f"Image Decoder {index}: {decoder}")
index += 1
#decoder = lv.image.decoder_get_next(decoder)
decoder = lv.image_decoder_t.get_next(decoder)
if index == 0:
print("No image decoders found.")
else:
print(f"Total image decoders: {index}")
# Run the function
list_image_decoders()
i = lv.image(lv.screen_active());
#i.set_src("P:/home/user/sources/MicroPythonOS/artwork/image.jpg");
i.set_src("P:/home/user/sources/MicroPythonOS/artwork/icon_64x64.jpg");
i.center()
h = lv.image_header_t()
i.decoder_get_info(i.image_dsc, h)
print("image info:")
print(h)
print(f"widthxheight: {h.w}x{h.h}")
-42
View File
@@ -1,42 +0,0 @@
width=240
height=240
import webcam
import time
cam = webcam.init("/dev/video0") # Initialize webcam with device path
memview = webcam.capture_frame(cam) # Returns memoryview
time.sleep_ms(1000)
static_bytes_obj = bytes(memview)
image = lv.image(lv.screen_active())
image.align(lv.ALIGN.LEFT_MID, 0, 0)
image.set_rotation(900)
# Create image descriptor once
image_dsc = lv.image_dsc_t({
"header": {
"magic": lv.IMAGE_HEADER_MAGIC,
"w": width,
"h": height,
"stride": width ,
"cf": lv.COLOR_FORMAT.L8
},
'data_size': width * height,
'data': static_bytes_obj # Will be updated per frame
})
image.set_src(image_dsc)
for i in range(300):
print(f"iteration {i}")
webcam.recapture_frame(cam) #refresh memview
bytes_obj = bytes(memview)
#print(f"got bytes: {len(bytes_obj)}")
#image_dsc.data = static_bytes_obj
image_dsc.data = bytes_obj
#image.set_src(image_dsc)
image.invalidate()
time.sleep_ms(10) # seems to need more than 0 or 1 ms
print("cleanup")
webcam.deinit(cam) # Deinitializes webcam
-2
View File
@@ -1,2 +0,0 @@
print("This script will be included in the build.")
print("You can then run it with: import include_in_build")
-46
View File
@@ -1,46 +0,0 @@
appscreen = lv.screen_active()
appscreen.clean()
password_ta=lv.textarea(appscreen)
password_ta.set_size(200,30)
password_ta.set_one_line(True)
password_ta.align(lv.ALIGN.TOP_MID, 5, 30)
password_ta.set_text("bla")
password_ta.set_placeholder_text("Password")
#password_ta.add_event_cb(password_ta_cb,lv.EVENT.CLICKED,None)
#oskeyboard=lv.keyboard(appscreen)
#oskeyboard.set_size(lv.pct(100),120)
#oskeyboard.align(lv.ALIGN.BOTTOM_LEFT,0,0)
#oskeyboard.set_textarea(password_ta)
#keyboard.add_event_cb(keyboard_cb,lv.EVENT.READY,None)
#keyboard.add_event_cb(keyboard_cb,lv.EVENT.CANCEL,None)
#keyboard.add_event_cb(keyboard_value_changed_cb,lv.EVENT.VALUE_CHANGED,None)
#oskeyboard.add_event_cb(touch_cb, lv.EVENT.ALL, None)
import sdl_keyboard
keyboard = sdl_keyboard.SDLKeyboard()
def keyboard_cb(event):
global canvas
event_code=event.get_code()
print(f"boot_unix: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
keyboard.add_event_cb(keyboard_cb, lv.EVENT.ALL, None)
keyboard.group.add_obj(password_ta)
#keyboard.group.add_obj(oskeyboard)
def touch_cb(event):
global canvas
event_code=event.get_code()
print(f"keyboard.py: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
password_ta.add_event_cb(touch_cb, lv.EVENT.ALL, None)
-110
View File
@@ -1,110 +0,0 @@
# Hardware initialization for Unix and MacOS systems
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
# Add lib/ to the path for modules, otherwise it will only search in ~/.micropython/lib and /usr/lib/micropython
import sys
sys.path.append('lib/')
import mpos.ui
#TFT_HOR_RES=640
#TFT_VER_RES=480
TFT_HOR_RES=320
TFT_VER_RES=240
def window_cb(args): # doesn't get called
print(f"Window callback: {args}")
bus = lcd_bus.SDLBus(flags=0)
bus.register_window_callback(window_cb)
# bus.set_window_size(320,240,-1,False) # -1 might be 25 but it always becomes black, except for format 0
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(data_bus=bus,display_width=TFT_HOR_RES,display_height=TFT_VER_RES,frame_buffer1=buf1,color_space=lv.COLOR_FORMAT.RGB565)
display.init()
import sdl_pointer
mouse = sdl_pointer.SDLPointer()
import sdl_keyboard
sdlkeyboard = sdl_keyboard.SDLKeyboard()
#indev.set_read_cb(keypad_cb)
# seems indev isn't properly initialized
def keypad_cb(indev, indev_data):
global sdlkeyboard
#print(f"keypad_cb {indev} {indev_data}")
#key = indev.get_key() # always 0
#print(f"key {key}")
#key = indev_data.get("key")
#print(f"key {key}")
pressed, code = sdlkeyboard._get_key()
print(f"periodic pressed: {pressed}, code: {code}")
sdlkeyboard._read(indev, indev_data)
# I mean we could read the key and put it in the textarea but I want some kind of keypress :-/
sdlkeyboard._indev_drv.set_read_cb(keypad_cb) # check for escape
def keyboard_cb(event):
event_code=event.get_code()
print(f"keyboard_test YES: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
def button_cb(event):
event_code=event.get_code()
name = mpos.ui.get_event_name(event_code)
print(f"button_cb YES: code={event_code} and name {name}")
# for some reason, this text areas is receiving mouse events, and draw events, but not key events...
def ta_callback_again(event):
event_code=event.get_code()
if event_code in [19,23,25,26,27,28,29,30,49]:
return
name = mpos.ui.get_event_name(event_code)
print(f"ta_callback_again {event_code} and {name}")
#print(f"ta_callback_again: code={event_code}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
sdlkeyboard.add_event_cb(keyboard_cb, lv.EVENT.ALL, None)
#group = lv.group_create()
#group = keyboard.get_group()
th = task_handler.TaskHandler(duration=5) # 5ms is recommended for MicroPython+LVGL on desktop
screen = lv.screen_active()
b = lv.button(screen)
b.center()
b.add_event_cb(button_cb, lv.EVENT.ALL, None)
#group.add_obj(b)
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.align(lv.ALIGN.TOP_LEFT,0,0)
ta.add_event_cb(ta_callback_again, lv.EVENT.ALL, None)
#group.add_obj(ta)
takeyboard = lv.keyboard(screen)
takeyboard.set_textarea(ta)
# this does something, but just gives indev 0, being error...
#indev = lv.indev_create()
#indev.set_type(lv.INDEV_TYPE.KEYPAD)
#indev.set_read_cb(keypad_cb) # check for escape
#keyboard.set_group(group)
-69
View File
@@ -1,69 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import mpos.ui
import sdl_pointer
import sdl_keyboard
# Display resolution
TFT_HOR_RES = 320
TFT_VER_RES = 240
# Initialize display
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
keyboard = sdl_keyboard.SDLKeyboard()
# Create group for input devices
group = lv.group_create()
keyboard.set_group(group)
# Create textarea
screen = lv.screen_active()
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.align(lv.ALIGN.TOP_LEFT, 0, 0)
ta.set_placeholder_text("Type here")
group.add_obj(ta)
# Optional: Debug event callback for textarea
def ta_event_cb(event):
event_code = event.get_code()
name = mpos.ui.get_event_name(event_code)
print(f"Textarea event: code={event_code}, name={name}")
ta.add_event_cb(ta_event_cb, lv.EVENT.ALL, None)
# Optional: Create an on-screen keyboard
keyboard_widget = lv.keyboard(screen)
keyboard_widget.set_textarea(ta)
keyboard_widget.add_flag(lv.obj.FLAG.HIDDEN)
def ta_focus_cb(event):
event_code = event.get_code()
if event_code == lv.EVENT.FOCUSED:
keyboard_widget.clear_flag(lv.obj.FLAG.HIDDEN)
elif event_code == lv.EVENT.DEFOCUSED:
keyboard_widget.add_flag(lv.obj.FLAG.HIDDEN)
ta.add_event_cb(ta_focus_cb, lv.EVENT.FOCUSED | lv.EVENT.DEFOCUSED, None)
# Task handler
th = task_handler.TaskHandler(duration=5) # 5ms for desktop
-113
View File
@@ -1,113 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import sdl_pointer
import sdl_keyboard
# Initialize display
TFT_HOR_RES = 320
TFT_VER_RES = 240
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
#keyboard = sdl_keyboard.SDLKeyboard()
pressed = False
def get_key(indev,data):
print("simulating get_key")
global pressed
if not pressed:
# input your keypad code
data.state = 1 #1 for press 0 for released
data.key=100
#pressed = True
else:
data.state = 0 #1 for press 0 for released
data.key=100
pressed = False
# Create group
group = lv.group_create()
group.set_default()
keyboard=lv.indev_create()
keyboard.set_type(lv.INDEV_TYPE.KEYPAD)
keyboard.set_read_cb(get_key)
keyboard.set_group(group)
#keyboard.set_group(group)
# Create widgets
screen = lv.screen_active()
# Textarea
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.set_placeholder_text("Type here")
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
group.add_obj(ta)
# Switch
sw = lv.switch(screen)
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
group.add_obj(sw)
# Test Button
btn = lv.button(screen)
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
lbl = lv.label(btn)
lbl.set_text("Test Button")
group.add_obj(btn)
# Simulate NEXT key button
btn_next = lv.button(screen)
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lbl_next = lv.label(btn_next)
lbl_next.set_text("NEXT")
def btn_next_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
# Simulate ENTER key button
btn_enter = lv.button(screen)
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
lbl_enter = lv.label(btn_enter)
lbl_enter.set_text("ENTER")
def btn_enter_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
# Debug focus
def check_focus():
focused = lv.group_get_focused(group)
print(f"Focused widget: {focused}")
th = task_handler.TaskHandler(duration=5)
th.register_task(check_focus, 1000)
# Debug events
def event_cb(event, name):
event_code = event.get_code()
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
-121
View File
@@ -1,121 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import sdl_pointer
import sdl_keyboard
# Initialize display
TFT_HOR_RES = 320
TFT_VER_RES = 240
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
keyboard = sdl_keyboard.SDLKeyboard()
# Create group
group = lv.group_create()
group.set_default()
keyboard.set_group(group)
# Simulated key input for buttons
simulated_key = None
simulated_state = None
def get_key(indev, data):
global simulated_key, simulated_state
if simulated_key is not None:
print(f"Simulating key: state={simulated_state}, key={simulated_key}")
data.state = simulated_state
data.key = simulated_key
simulated_key = None # Clear after processing
else:
data.state = 0 # No key event by default
data.key = 0
# Create custom input device for simulated keys
sim_indev = lv.indev_create()
sim_indev.set_type(lv.INDEV_TYPE.KEYPAD)
sim_indev.set_read_cb(get_key)
sim_indev.set_group(group)
# Create widgets
screen = lv.screen_active()
# Textarea
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.set_placeholder_text("Type here")
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
group.add_obj(ta)
# Switch
sw = lv.switch(screen)
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
group.add_obj(sw)
# Test Button
btn = lv.button(screen)
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
lbl = lv.label(btn)
lbl.set_text("Test Button")
group.add_obj(btn)
# Simulate NEXT key button
btn_next = lv.button(screen)
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lbl_next = lv.label(btn_next)
lbl_next.set_text("PREV")
def btn_next_cb(event):
global simulated_key, simulated_state
if event.get_code() == lv.EVENT.CLICKED:
simulated_key = lv.KEY.PREV
simulated_state = 1
sim_indev.read() # Trigger read immediately
simulated_state = 0
sim_indev.read()
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
# Simulate ENTER key button
btn_enter = lv.button(screen)
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
lbl_enter = lv.label(btn_enter)
lbl_enter.set_text("ENTER")
def btn_enter_cb(event):
global simulated_key, simulated_state
if event.get_code() == lv.EVENT.CLICKED:
simulated_key = lv.KEY.ENTER
simulated_state = 1
sim_indev.read()
simulated_state = 0
sim_indev.read()
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
# Debug focus
def check_focus():
focused = lv.group_get_focused(group)
print(f"Focused widget: {focused}")
th = task_handler.TaskHandler(duration=5)
th.register_task(check_focus, 1000)
# Debug events
def event_cb(event, name):
event_code = event.get_code()
key = event.get_key() if event_code == lv.EVENT.KEY else None
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}, key={key}")
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
-114
View File
@@ -1,114 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import sdl_pointer
import sdl_keyboard
# Initialize display
TFT_HOR_RES = 320
TFT_VER_RES = 240
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
keyboard = sdl_keyboard.SDLKeyboard()
pressed = False
def get_key(indev,data):
print("simulating get_key")
global pressed
if not pressed:
# input your keypad code
data.state = 1 #1 for press 0 for released
data.key=100
#pressed = True
else:
data.state = 0 #1 for press 0 for released
data.key=100
pressed = False
# Create group
#group = lv.group_create()
#group.set_default()
group = keyboard.get_group()
#keyboard=lv.indev_create()
#keyboard.set_type(lv.INDEV_TYPE.KEYPAD)
#keyboard.set_read_cb(get_key)
#keyboard.set_group(group)
#keyboard.set_group(group)
# Create widgets
screen = lv.screen_active()
# Textarea
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.set_placeholder_text("Type here")
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
group.add_obj(ta)
# Switch
sw = lv.switch(screen)
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
group.add_obj(sw)
# Test Button
btn = lv.button(screen)
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
lbl = lv.label(btn)
lbl.set_text("Test Button")
group.add_obj(btn)
# Simulate NEXT key button
btn_next = lv.button(screen)
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lbl_next = lv.label(btn_next)
lbl_next.set_text("NEXT")
def btn_next_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
# Simulate ENTER key button
btn_enter = lv.button(screen)
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
lbl_enter = lv.label(btn_enter)
lbl_enter.set_text("ENTER")
def btn_enter_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
# Debug focus
def check_focus():
focused = lv.group_get_focused(group)
print(f"Focused widget: {focused}")
th = task_handler.TaskHandler(duration=5)
th.register_task(check_focus, 1000)
# Debug events
def event_cb(event, name):
event_code = event.get_code()
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
@@ -1,114 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import sdl_pointer
import sdl_keyboard
# Initialize display
TFT_HOR_RES = 320
TFT_VER_RES = 240
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
keyboard = sdl_keyboard.SDLKeyboard()
pressed = False
def get_key(indev,data):
print("simulating get_key")
global pressed
if not pressed:
# input your keypad code
data.state = 1 #1 for press 0 for released
data.key=100
#pressed = True
else:
data.state = 0 #1 for press 0 for released
data.key=100
pressed = False
# Create group
#group = lv.group_create()
#group.set_default()
group = keyboard.get_group()
#keyboard=lv.indev_create()
#keyboard.set_type(lv.INDEV_TYPE.KEYPAD)
#keyboard.set_read_cb(get_key)
#keyboard.set_group(group)
#keyboard.set_group(group)
# Create widgets
screen = lv.screen_active()
# Textarea
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.set_placeholder_text("Type here")
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
group.add_obj(ta)
# Switch
sw = lv.switch(screen)
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
group.add_obj(sw)
# Test Button
btn = lv.button(screen)
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
lbl = lv.label(btn)
lbl.set_text("Test Button")
group.add_obj(btn)
# Simulate NEXT key button
btn_next = lv.button(screen)
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lbl_next = lv.label(btn_next)
lbl_next.set_text("NEXT")
def btn_next_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
# Simulate ENTER key button
btn_enter = lv.button(screen)
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
lbl_enter = lv.label(btn_enter)
lbl_enter.set_text("ENTER")
def btn_enter_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
# Debug focus
def check_focus():
focused = lv.group_get_focused(group)
print(f"Focused widget: {focused}")
th = task_handler.TaskHandler(duration=5)
th.register_task(check_focus, 1000)
# Debug events
def event_cb(event, name):
event_code = event.get_code()
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)
-103
View File
@@ -1,103 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import sdl_pointer
import sdl_keyboard
# Initialize display
TFT_HOR_RES = 320
TFT_VER_RES = 240
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
keyboard = sdl_keyboard.SDLKeyboard()
# Create group
group = lv.group_create()
group.set_default() # Set as default group
keyboard.set_group(group)
# Create widgets
screen = lv.screen_active()
# Textarea
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.set_placeholder_text("Type here")
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
group.add_obj(ta)
# Switch
sw = lv.switch(screen)
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
group.add_obj(sw)
# Button
btn = lv.button(screen)
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
lbl = lv.label(btn)
lbl.set_text("Test Button")
group.add_obj(btn)
# Simulate NEXT key button
btn_next = lv.button(screen)
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lbl_next = lv.label(btn_next)
lbl_next.set_text("NEXT")
def btn_next_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard.send_event(lv.EVENT.KEY, lv.KEY.NEXT)
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
# Simulate ENTER key button
btn_enter = lv.button(screen)
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
lbl_enter = lv.label(btn_enter)
lbl_enter.set_text("ENTER")
def btn_enter_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard.send_event(lv.EVENT.KEY, lv.KEY.ENTER)
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
# Debug focus
def check_focus():
focused = lv.group_get_focused(group)
print(f"Focused widget: {focused}")
th = task_handler.TaskHandler(duration=5)
th.register_task(check_focus, 1000) # Check focus every 1s
# Debug textarea events
def ta_event_cb(event):
event_code = event.get_code()
name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')
print(f"Textarea event: code={event_code}, name={name}")
ta.add_event_cb(ta_event_cb, lv.EVENT.ALL, None)
# Debug switch events
def sw_event_cb(event):
event_code = event.get_code()
name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')
print(f"Switch event: code={event_code}, name={name}")
sw.add_event_cb(sw_event_cb, lv.EVENT.ALL, None)
# Debug button events
def btn_event_cb(event):
event_code = event.get_code()
name = getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')
print(f"Button event: code={event_code}, name={name}")
btn.add_event_cb(btn_event_cb, lv.EVENT.ALL, None)
@@ -1,92 +0,0 @@
import lcd_bus
import lvgl as lv
import sdl_display
import task_handler
import sys
sys.path.append('lib/')
import sdl_pointer
import sdl_keyboard
# Initialize display
TFT_HOR_RES = 320
TFT_VER_RES = 240
bus = lcd_bus.SDLBus(flags=0)
buf1 = bus.allocate_framebuffer(TFT_HOR_RES * TFT_VER_RES * 2, 0)
display = sdl_display.SDLDisplay(
data_bus=bus,
display_width=TFT_HOR_RES,
display_height=TFT_VER_RES,
frame_buffer1=buf1,
color_space=lv.COLOR_FORMAT.RGB565
)
display.init()
# Initialize mouse
mouse = sdl_pointer.SDLPointer()
# Initialize keyboard
keyboard = sdl_keyboard.SDLKeyboard()
# Create group
group = lv.group_create()
group.set_default()
keyboard.set_group(group)
# Create widgets
screen = lv.screen_active()
# Textarea
ta = lv.textarea(screen)
ta.set_one_line(True)
ta.set_placeholder_text("Type here")
ta.align(lv.ALIGN.TOP_LEFT, 10, 10)
group.add_obj(ta)
# Switch
sw = lv.switch(screen)
sw.align(lv.ALIGN.TOP_LEFT, 10, 50)
group.add_obj(sw)
# Test Button
btn = lv.button(screen)
btn.align(lv.ALIGN.TOP_LEFT, 10, 90)
lbl = lv.label(btn)
lbl.set_text("Test Button")
group.add_obj(btn)
# Simulate NEXT key button
btn_next = lv.button(screen)
btn_next.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lbl_next = lv.label(btn_next)
lbl_next.set_text("NEXT")
def btn_next_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 9, 0) # Simulate KEY_TAB (lv.KEY.NEXT) press
keyboard._keypad_cb(None, 0, 9, 0) # Simulate release
btn_next.add_event_cb(btn_next_cb, lv.EVENT.CLICKED, None)
# Simulate ENTER key button
btn_enter = lv.button(screen)
btn_enter.align(lv.ALIGN.BOTTOM_LEFT, 100, -10)
lbl_enter = lv.label(btn_enter)
lbl_enter.set_text("ENTER")
def btn_enter_cb(event):
if event.get_code() == lv.EVENT.CLICKED:
keyboard._keypad_cb(None, 1, 13, 0) # Simulate KEY_RETURN (lv.KEY.ENTER) press
keyboard._keypad_cb(None, 0, 13, 0) # Simulate release
btn_enter.add_event_cb(btn_enter_cb, lv.EVENT.CLICKED, None)
# Debug focus
def check_focus():
focused = lv.group_get_focused(group)
print(f"Focused widget: {focused}")
th = task_handler.TaskHandler(duration=5)
th.register_task(check_focus, 1000)
# Debug events
def event_cb(event, name):
event_code = event.get_code()
print(f"{name} event: code={event_code}, name={getattr(lv, 'EVENT_' + str(event_code), 'UNKNOWN')}")
ta.add_event_cb(lambda e: event_cb(e, "Textarea"), lv.EVENT.ALL, None)
sw.add_event_cb(lambda e: event_cb(e, "Switch"), lv.EVENT.ALL, None)
btn.add_event_cb(lambda e: event_cb(e, "Button"), lv.EVENT.ALL, None)

Some files were not shown because too many files have changed in this diff Show More