You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
osupdate: remove sleep(10)
This commit is contained in:
@@ -1,13 +0,0 @@
|
||||
{
|
||||
"name": "BTC Ticker",
|
||||
"publisher": "ACME Inc",
|
||||
"short_description": "Bitcoin Price Ticker",
|
||||
"long_description": "Uses a websocket to show the current price of Bitcoin in US Dollar.",
|
||||
"icon_url": "http://demo.lnpiggy.com:2121/apps/com.example.btcticker_0.0.1.mpk_icon_64x64.png",
|
||||
"download_url": "http://demo.lnpiggy.com:2121/apps/com.example.btcticker_0.0.1.mpk",
|
||||
"fullname": "com.example.btcticker",
|
||||
"version": "0.0.1",
|
||||
"entrypoint": "assets/bitcoin_price.py",
|
||||
"category": "money"
|
||||
}
|
||||
|
||||
@@ -1,228 +0,0 @@
|
||||
import usocket
|
||||
import ssl
|
||||
import ubinascii
|
||||
import ujson
|
||||
import urandom
|
||||
import uhashlib
|
||||
import time
|
||||
|
||||
def compute_accept_key(key):
|
||||
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
||||
hash_obj = uhashlib.sha1(key + GUID)
|
||||
return ubinascii.b2a_base64(hash_obj.digest())[:-1].decode()
|
||||
|
||||
def ws_handshake(host, port, path="/"):
|
||||
print(f"Connecting to {host}:{port}")
|
||||
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
|
||||
addr = usocket.getaddrinfo(host, port)[0][-1]
|
||||
print(f"Resolved: {addr}")
|
||||
sock.connect(addr)
|
||||
|
||||
ssl_sock = ssl.wrap_socket(sock, server_hostname=host)
|
||||
print("SSL handshake complete")
|
||||
|
||||
key_bytes = bytearray()
|
||||
for _ in range(4):
|
||||
key_bytes.extend(urandom.getrandbits(32).to_bytes(4, 'big'))
|
||||
key = ubinascii.b2a_base64(key_bytes)[:-1].decode()
|
||||
expected_accept = compute_accept_key(key)
|
||||
print(f"Key: {key}")
|
||||
print(f"Expected Sec-WebSocket-Accept: {expected_accept}")
|
||||
|
||||
handshake = (
|
||||
f"GET {path} HTTP/1.1\r\n"
|
||||
f"Host: {host}\r\n"
|
||||
f"Upgrade: websocket\r\n"
|
||||
f"Connection: Upgrade\r\n"
|
||||
f"Sec-WebSocket-Key: {key}\r\n"
|
||||
f"Sec-WebSocket-Version: 13\r\n"
|
||||
f"\r\n"
|
||||
)
|
||||
print(f"Handshake:\n{handshake}")
|
||||
ssl_sock.write(handshake.encode())
|
||||
|
||||
response = ssl_sock.readline()
|
||||
print(f"Response: {response.decode()}")
|
||||
if b"101" not in response:
|
||||
ssl_sock.close()
|
||||
raise ValueError(f"Handshake failed: {response.decode()}")
|
||||
|
||||
accept_key = None
|
||||
while True:
|
||||
line = ssl_sock.readline()
|
||||
print(f"Header: {line.decode()}")
|
||||
if line == b"\r\n":
|
||||
break
|
||||
if line.lower().startswith(b"sec-websocket-accept:"):
|
||||
accept_key = line.decode().split(":")[1].strip()
|
||||
|
||||
if not accept_key:
|
||||
ssl_sock.close()
|
||||
raise ValueError("No Sec-WebSocket-Accept header received")
|
||||
if accept_key != expected_accept:
|
||||
ssl_sock.close()
|
||||
raise ValueError(f"Invalid Sec-WebSocket-Accept: got {accept_key}, expected {expected_accept}")
|
||||
|
||||
# Set non-blocking read
|
||||
ssl_sock.setblocking(False)
|
||||
return ssl_sock
|
||||
|
||||
def ws_send_text(sock, message):
|
||||
#print(f"Sending: {message}")
|
||||
data = message.encode()
|
||||
#print(f"Data: {data}")
|
||||
length = len(data)
|
||||
|
||||
frame = bytearray()
|
||||
frame.append(0x81) # FIN=1, opcode=0x1 (text)
|
||||
mask_bit = 0x80
|
||||
mask_key = bytearray(urandom.getrandbits(32).to_bytes(4, 'big'))
|
||||
#print(f"Mask key: {mask_key}")
|
||||
|
||||
if length <= 125:
|
||||
frame.append(mask_bit | length)
|
||||
elif length <= 65535:
|
||||
frame.append(mask_bit | 126)
|
||||
frame.extend(length.to_bytes(2, 'big'))
|
||||
else:
|
||||
frame.append(mask_bit | 127)
|
||||
frame.extend(length.to_bytes(8, 'big'))
|
||||
|
||||
frame.extend(mask_key)
|
||||
masked_data = bytearray(data)
|
||||
for i in range(len(data)):
|
||||
masked_data[i] ^= mask_key[i % 4]
|
||||
frame.extend(masked_data)
|
||||
#print(f"Frame: {frame}")
|
||||
sock.write(frame)
|
||||
|
||||
def ws_read_frame(sock):
|
||||
try:
|
||||
data = sock.read(2) # Read header (FIN/opcode, length)
|
||||
except OSError as e:
|
||||
print(f"Read error: {e}")
|
||||
return None
|
||||
if not data:
|
||||
print("Read: empty")
|
||||
return None
|
||||
#print(f"Raw header: {data}")
|
||||
|
||||
if len(data) < 2:
|
||||
print("Frame too short")
|
||||
return None
|
||||
|
||||
fin = (data[0] & 0x80) >> 7
|
||||
opcode = data[0] & 0x0F
|
||||
masked = (data[1] & 0x80) >> 7
|
||||
payload_len = data[1] & 0x7F
|
||||
#print(f"FIN: {fin} Opcode: {opcode} Masked: {masked} Len: {payload_len}")
|
||||
|
||||
if payload_len == 126:
|
||||
len_data = sock.read(2)
|
||||
payload_len = int.from_bytes(len_data, 'big')
|
||||
#print(f"Extended len: {payload_len}")
|
||||
elif payload_len == 127:
|
||||
len_data = sock.read(8)
|
||||
payload_len = int.from_bytes(len_data, 'big')
|
||||
#print(f"Extended len: {payload_len}")
|
||||
|
||||
mask_key = None
|
||||
if masked:
|
||||
mask_key = sock.read(4)
|
||||
#print(f"Mask key: {mask_key}")
|
||||
|
||||
payload = sock.read(payload_len)
|
||||
#print(f"Raw payload: {payload}")
|
||||
if masked and mask_key:
|
||||
payload = bytearray(payload)
|
||||
for i in range(len(payload)):
|
||||
payload[i] ^= mask_key[i % 4]
|
||||
#print(f"Payload: {payload}")
|
||||
|
||||
if opcode == 0x1:
|
||||
return payload.decode()
|
||||
elif opcode == 0x9:
|
||||
print("Ping frame received")
|
||||
return None
|
||||
elif opcode == 0xA:
|
||||
print("Pong frame received")
|
||||
return None
|
||||
elif opcode == 0x8:
|
||||
print("Close frame received")
|
||||
return None
|
||||
else:
|
||||
print(f"Unknown opcode: {opcode}")
|
||||
return None
|
||||
|
||||
def get_price(json_str):
|
||||
try:
|
||||
# Parse the JSON string into a Python dictionary
|
||||
data = ujson.loads(json_str)
|
||||
# Check if 'price' exists in the dictionary
|
||||
if "price" in data:
|
||||
price = data["price"]
|
||||
return price # Return the price as a string
|
||||
# If you need it as a float, use: return float(price)
|
||||
else:
|
||||
print("Error: 'price' field not found in JSON")
|
||||
return None
|
||||
except ValueError:
|
||||
# Handle invalid JSON
|
||||
print("Error: Invalid JSON string")
|
||||
return None
|
||||
except Exception as e:
|
||||
# Handle any other unexpected errors
|
||||
print(f"Error: An unexpected error occurred - {str(e)}")
|
||||
return None
|
||||
|
||||
def refresh_price(timer):
|
||||
global summary, status
|
||||
data = ws_read_frame(sock)
|
||||
if data:
|
||||
print(f"Response: {data}")
|
||||
price = get_price(data)
|
||||
if price:
|
||||
print(f"Price: {price}")
|
||||
summary += f"{price}\n"
|
||||
status.set_text(summary)
|
||||
|
||||
def janitor_cb(timer):
|
||||
if lv.screen_active() != appscreen:
|
||||
print("bitcoin_price.py backgrounded, cleaning up...")
|
||||
janitor.delete()
|
||||
sock.close()
|
||||
get_price_timer.delete()
|
||||
|
||||
appscreen = lv.screen_active()
|
||||
janitor = lv.timer_create(janitor_cb, 500, None)
|
||||
|
||||
status = lv.label(appscreen)
|
||||
status.align(lv.ALIGN.TOP_LEFT, 5, 10)
|
||||
status.set_style_text_color(lv.color_hex(0xFFFFFF), 0)
|
||||
summary = "Bitcoin USD price\nfrom Coinbase's Websocket API:\n\n"
|
||||
status.set_text(summary)
|
||||
|
||||
|
||||
port = 443
|
||||
host = "ws-feed.exchange.coinbase.com"
|
||||
path = "/"
|
||||
|
||||
try:
|
||||
sock = ws_handshake(host, port, path)
|
||||
print("Reading a bit...") # echo.websocket.org sends a reply
|
||||
for _ in range(3):
|
||||
time.sleep(1)
|
||||
data = ws_read_frame(sock)
|
||||
if data:
|
||||
print(f"Response: {data}")
|
||||
break
|
||||
print("Subscribing to price updates...")
|
||||
ws_send_text(sock, ujson.dumps({"type": "subscribe","product_ids": ["BTC-USD"],"channels": ["ticker_batch"]}))
|
||||
get_price_timer = lv.timer_create(refresh_price, 1000, None)
|
||||
except Exception as e:
|
||||
print(f"Error: {str(e)}")
|
||||
try:
|
||||
sock.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
import ssl
|
||||
import aiohttp
|
||||
import asyncio
|
||||
|
||||
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
sslctx.verify_mode = ssl.CERT_NONE
|
||||
#sslctx.verify_mode = ssl.CERT_REQUIRED # doesn't work because OSError: (-30336, 'MBEDTLS_ERR_SSL_CA_CHAIN_REQUIRED')
|
||||
|
||||
async def ws_test_echo(session):
|
||||
async with session.ws_connect("wss://echo.websocket.events", ssl=sslctx) as ws:
|
||||
await ws.send_str("hello world!\r\n")
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
print(msg.data)
|
||||
if "close" in msg.data:
|
||||
break
|
||||
await ws.send_str("close\r\n")
|
||||
await ws.close()
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
await ws_test_echo(session)
|
||||
|
||||
asyncio.run(main())
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 447 B |
@@ -1,13 +0,0 @@
|
||||
{
|
||||
"name": "Benchmark",
|
||||
"publisher": "ACME Inc",
|
||||
"short_description": "Testing the CPU speed",
|
||||
"long_description": "Experimentally determines how many idle loops and sha256-hashing loops the CPU can perform per second.",
|
||||
"icon_url": "http://demo.lnpiggy.com:2121/apps/com.example.cputest_0.0.1.mpk_icon_64x64.png",
|
||||
"download_url": "http://demo.lnpiggy.com:2121/apps/com.example.cputest_0.0.1.mpk",
|
||||
"fullname": "com.example.cputest",
|
||||
"version": "0.0.1",
|
||||
"entrypoint": "assets/cputest.py",
|
||||
"category": "benchmarking"
|
||||
}
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
# Example results for ESP32-S3 with 8MB SPIRAM:
|
||||
#
|
||||
# Test Summary:
|
||||
# Busy loop: 153018.20 iterations/second
|
||||
# Busy loop with yield: 40303.37 iterations/second
|
||||
# SHA-256 (1KB): 7357.60 iterations/second
|
||||
#
|
||||
# Adding the appscreen == lv.screen_active() test reduces the results to:
|
||||
# Busy loop: 22289 iterations/second
|
||||
# Busy loop with yield: 15923 iterations/second
|
||||
# SHA-256 (1KB): 5269 iterations/second
|
||||
#
|
||||
# New way, with the tests in a thread, replacing appscreen == lv.screen_active() with keeprunning:
|
||||
# Busy loop: 127801 iterations/second
|
||||
# Busy loop with yield: 38394 iterations/second
|
||||
# SHA-256 (1KB): 5012 iterations/second
|
||||
#
|
||||
# New way, but now with 10 in a loop and 10 second tests for more stability:
|
||||
# Busy loop: 127000 iterations/second
|
||||
# Busy loop with yield: 46000 iterations/second => this went up 25%
|
||||
# SHA-256 (1KB): 5100 iterations/second
|
||||
#
|
||||
# When the nostr thing is hanging:
|
||||
# Busy loop: 102000 iterations/second
|
||||
# Busy loop with yield: 12000 iterations/second => this went down 70%!
|
||||
# SHA-256 (1KB): 5100 iterations/second
|
||||
#
|
||||
# Results on desktop:
|
||||
# Busy loop: 15 997 000 => 125x faster
|
||||
# Busy loop with yield: 10 575 000 => 229x faster
|
||||
# SHA-256 (1KB): 291 000 => 57x faster
|
||||
|
||||
|
||||
import time
|
||||
import hashlib
|
||||
import os
|
||||
import _thread
|
||||
|
||||
import mpos.apps
|
||||
import mpos.ui
|
||||
|
||||
keeprunning = True
|
||||
summary = "Running 3 CPU tests...\n\n"
|
||||
|
||||
# Configuration
|
||||
START_SPACING = 2000 # Wait for system to settle
|
||||
TEST_DURATION = 10000 # Duration of each test (ms)
|
||||
TEST_SPACING = 1000 # Wait between tests (ms)
|
||||
|
||||
def stress_test_thread():
|
||||
DATA_SIZE = 1024 # 1KB of data for SHA-256 test
|
||||
DATA = os.urandom(DATA_SIZE) # Generate 1KB of random data for SHA-256
|
||||
print("stress_test_thread running")
|
||||
global summary, keeprunning
|
||||
summary += "Waiting for system to settle...\n\n"
|
||||
time.sleep_ms(START_SPACING)
|
||||
summary += "Busy loop without yield: "
|
||||
iterations = 0
|
||||
start_time = time.ticks_ms()
|
||||
end_time = start_time + TEST_DURATION
|
||||
while time.ticks_ms() < end_time and keeprunning:
|
||||
iterations += 1
|
||||
duration_ms = time.ticks_diff(time.ticks_ms(), start_time)
|
||||
iterations_per_second = (iterations / duration_ms) * 1000
|
||||
print(f"Busy loop test ran duration: {duration_ms}, average: {iterations_per_second:.2f} iterations/second")
|
||||
summary += f"{iterations_per_second:.2f}/s\n"
|
||||
#
|
||||
time.sleep_ms(TEST_SPACING)
|
||||
print("\nStarting busy loop with yield (sleep_ms(0)) stress test...")
|
||||
summary += "Busy loop with yield: "
|
||||
iterations = 0
|
||||
start_time = time.ticks_ms()
|
||||
end_time = start_time + TEST_DURATION
|
||||
while time.ticks_ms() < end_time and keeprunning:
|
||||
for _ in range(10):
|
||||
time.sleep_ms(0) # Yield to other tasks
|
||||
iterations += 10
|
||||
duration_ms = time.ticks_diff(time.ticks_ms(), start_time)
|
||||
iterations_per_second = (iterations / duration_ms) * 1000
|
||||
print(f"Busy loop with yield test completed: {iterations_per_second:.2f} iterations/second")
|
||||
summary += f"{iterations_per_second:.2f}/s\n"
|
||||
#
|
||||
time.sleep_ms(TEST_SPACING)
|
||||
print("\nStarting SHA-256 stress test (1KB data)...")
|
||||
summary += "Busy loop with SHA-256 (1KB): "
|
||||
iterations = 0
|
||||
start_time = time.ticks_ms()
|
||||
end_time = start_time + TEST_DURATION
|
||||
while time.ticks_ms() < end_time and keeprunning:
|
||||
for _ in range(10):
|
||||
hashlib.sha256(DATA).digest() # Compute SHA-256 on 1KB data
|
||||
iterations += 10
|
||||
duration_ms = time.ticks_diff(time.ticks_ms(), start_time)
|
||||
iterations_per_second = (iterations / duration_ms) * 1000
|
||||
print(f"SHA-256 test completed: {iterations_per_second:.2f} iterations/second")
|
||||
summary += f" {iterations_per_second:.2f}/s\n"
|
||||
summary += "\nAll tests completed."
|
||||
|
||||
|
||||
def janitor_cb(timer):
|
||||
global keeprunning
|
||||
if lv.screen_active() != main_screen:
|
||||
print("cputest.py backgrounded, cleaning up...")
|
||||
janitor.delete()
|
||||
keeprunning = False
|
||||
update_status_timer.delete()
|
||||
|
||||
main_screen = lv.obj()
|
||||
janitor = lv.timer_create(janitor_cb, 1000, None)
|
||||
update_status_timer = lv.timer_create(lambda timer: status.set_text(summary), 750, None)
|
||||
|
||||
status = lv.label(main_screen)
|
||||
status.align(lv.ALIGN.TOP_LEFT, 5, 10)
|
||||
status.set_text(summary)
|
||||
mpos.ui.load_screen(main_screen)
|
||||
|
||||
_thread.stack_size(mpos.apps.good_stack_size())
|
||||
_thread.start_new_thread(stress_test_thread, ())
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 3.0 KiB |
@@ -1,13 +0,0 @@
|
||||
{
|
||||
"name": "Draw",
|
||||
"publisher": "ACME Inc",
|
||||
"short_description": "Simple drawing app",
|
||||
"long_description": "Draw simple shapes on the screen.",
|
||||
"icon_url": "http://demo.lnpiggy.com:2121/apps/com.example.draw_0.0.1.mpk_icon_64x64.png",
|
||||
"download_url": "http://demo.lnpiggy.com:2121/apps/com.example.draw_0.0.1.mpk",
|
||||
"fullname": "com.example.draw",
|
||||
"version": "0.0.1",
|
||||
"entrypoint": "assets/draw.py",
|
||||
"category": "art"
|
||||
}
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
import mpos.ui
|
||||
|
||||
indev_error_x = 160
|
||||
indev_error_y = 120
|
||||
|
||||
DARKPINK = lv.color_hex(0xEC048C)
|
||||
|
||||
# doesnt work:
|
||||
def draw_line(x, y):
|
||||
global canvas
|
||||
# Line drawing like this doesn't work:
|
||||
layer = lv.layer_t()
|
||||
canvas.init_layer(layer)
|
||||
dsc = lv.draw_line_dsc_t()
|
||||
dsc.color = DARKPINK
|
||||
dsc.width = 4
|
||||
dsc.round_end = 1
|
||||
dsc.round_start = 1
|
||||
dsc.p1 = lv.point_precise_t()
|
||||
dsc.p1.x = x
|
||||
dsc.p1.y = y
|
||||
dsc.p2 = lv.point_precise_t()
|
||||
dsc.p2.x = 100
|
||||
dsc.p2.y = 200
|
||||
#layer.draw_line(dsc) doesnt exist!
|
||||
lv.draw_line(layer,dsc) # doesnt do anything!
|
||||
canvas.finish_layer(layer)
|
||||
|
||||
|
||||
def touch_cb(event):
|
||||
global canvas
|
||||
event_code=event.get_code()
|
||||
# Ignore:
|
||||
# =======
|
||||
# 19: HIT_TEST
|
||||
# COVER_CHECK
|
||||
# DRAW_MAIN
|
||||
# DRAW_MAIN_BEGIN
|
||||
# DRAW_MAIN_END
|
||||
# DRAW_POST
|
||||
# DRAW_POST_BEGIN
|
||||
# DRAW_POST_END
|
||||
# GET_SELF_SIZE
|
||||
if event_code not in [19,23,25,26,27,28,29,30,49]:
|
||||
name = mpos.ui.get_event_name(event_code)
|
||||
#print(f"lv_event_t: code={event_code}, name={name}") # target={event.get_target()}, user_data={event.get_user_data()}, param={event.get_param()}
|
||||
if event_code == lv.EVENT.PRESSING: # this is probably enough
|
||||
#if event_code in [lv.EVENT.PRESSED, lv.EVENT.PRESSING, lv.EVENT.LONG_PRESSED, lv.EVENT.LONG_PRESSED_REPEAT]:
|
||||
x, y = mpos.ui.get_pointer_xy()
|
||||
# drawing a point works:
|
||||
#canvas.set_px(x,y,lv.color_black(),lv.OPA.COVER)
|
||||
#
|
||||
# drawing a square like this works:
|
||||
#for dx in range(-5,5):
|
||||
# for dy in range(-5,5):
|
||||
# canvas.set_px(x+dx,y+dy,DARKPINK,lv.OPA.COVER)
|
||||
#
|
||||
# drawing a circle works:
|
||||
radius = 7 # Set desired radius
|
||||
if x == indev_error_x and y == indev_error_y:
|
||||
radius = 25 # in case of indev error
|
||||
square = radius * radius
|
||||
for dx in range(-radius, radius):
|
||||
for dy in range(-radius, radius):
|
||||
if dx * dx + dy * dy <= square:
|
||||
newx, newy = x + dx, y + dy
|
||||
if 0 <= newx <= hor_res and 0 <= newy <= ver_res: # don't draw outside of canvas because that may crash
|
||||
canvas.set_px(x + dx, y + dy, DARKPINK, lv.OPA.COVER)
|
||||
|
||||
|
||||
main_screen = lv.obj()
|
||||
canvas = lv.canvas(main_screen)
|
||||
|
||||
disp = lv.display_get_default()
|
||||
hor_res = disp.get_horizontal_resolution()
|
||||
ver_res = disp.get_vertical_resolution()
|
||||
|
||||
canvas.set_size(hor_res, ver_res)
|
||||
canvas.set_style_bg_color(lv.color_white(), 0)
|
||||
|
||||
buffer = bytearray(hor_res * ver_res * 4)
|
||||
|
||||
canvas.set_buffer(buffer, hor_res, ver_res, lv.COLOR_FORMAT.NATIVE)
|
||||
canvas.fill_bg(lv.color_white(), lv.OPA.COVER)
|
||||
canvas.add_flag(lv.obj.FLAG.CLICKABLE)
|
||||
canvas.add_event_cb(touch_cb, lv.EVENT.ALL, None)
|
||||
|
||||
mpos.ui.load_screen(main_screen)
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 6.0 KiB |
@@ -1,12 +0,0 @@
|
||||
{
|
||||
"name": "HelloWorld",
|
||||
"publisher": "ACME Inc",
|
||||
"short_description": "Minimal app",
|
||||
"long_description": "Demonstrates the simplest app.",
|
||||
"icon_url": "http://demo.lnpiggy.com:2121/apps/com.example.helloworld_0.0.1.mpk_icon_64x64.png",
|
||||
"download_url": "http://demo.lnpiggy.com:2121/apps/com.example.helloworld_0.0.1.mpk",
|
||||
"fullname": "com.example.helloworld",
|
||||
"version": "0.0.1",
|
||||
"entrypoint": "assets/hello.py",
|
||||
"category": "development"
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
import mpos.ui
|
||||
|
||||
def button_click(e):
|
||||
print("Button clicked!")
|
||||
|
||||
scr = lv.obj()
|
||||
|
||||
# Create a button with a label
|
||||
btn = lv.button(scr)
|
||||
btn.align(lv.ALIGN.CENTER, 0, 0)
|
||||
#btn.set_size(lv.pct(100),lv.pct(100))
|
||||
btn.add_event_cb(button_click,lv.EVENT.CLICKED,None)
|
||||
label = lv.label(btn)
|
||||
label.set_text('Hello World!')
|
||||
|
||||
mpos.ui.load_screen(scr)
|
||||
|
||||
# Optional janitor that cleans up when the app is backgrounded:
|
||||
def janitor_cb(timer):
|
||||
if lv.screen_active() != scr:
|
||||
print("app backgrounded, cleaning up...")
|
||||
janitor.delete()
|
||||
# No cleanups to do, but in a real app, you might stop timers, deinitialize hardware devices you used, close network connections, etc.
|
||||
|
||||
janitor = lv.timer_create(janitor_cb, 1000, None)
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 5.3 KiB |
@@ -1,12 +0,0 @@
|
||||
{
|
||||
"name": "Accelero",
|
||||
"publisher": "ACME Inc",
|
||||
"short_description": "Test the Inertial Measurement Unit",
|
||||
"long_description": "It is always good to make sure the accelerometer is working properly. How else can you measure acceleration and position?",
|
||||
"icon_url": "http://demo.lnpiggy.com:2121/apps/com.example.imutest_0.0.1.mpk_icon_64x64.png",
|
||||
"download_url": "http://demo.lnpiggy.com:2121/apps/com.example.imutest_0.0.1.mpk",
|
||||
"fullname": "com.example.imutest",
|
||||
"version": "0.0.1",
|
||||
"entrypoint": "assets/imutest.py",
|
||||
"category": "benchmarking"
|
||||
}
|
||||
@@ -1,86 +0,0 @@
|
||||
import mpos.ui
|
||||
|
||||
# screens:
|
||||
|
||||
main_screen = None
|
||||
|
||||
def map_nonlinear(value: float) -> int:
|
||||
# Preserve sign and work with absolute value
|
||||
sign = 1 if value >= 0 else -1
|
||||
abs_value = abs(value)
|
||||
# Apply non-linear transformation (square root) to absolute value
|
||||
# Scale input range [0, 200] to [0, sqrt(200)] first
|
||||
sqrt_value = (abs_value ** 0.5)
|
||||
# Scale to output range [0, 100]
|
||||
# Map [0, sqrt(200)] to [50, 100] for positive, [0, 50] for negative
|
||||
max_sqrt = 200.0 ** 0.5 # Approx 14.142
|
||||
scaled = (sqrt_value / max_sqrt) * 50.0 # Scale to [0, 50]
|
||||
return int(50.0 + (sign * scaled)) # Shift to [0, 100]
|
||||
|
||||
def refresh(timer):
|
||||
if have_imu:
|
||||
#print(f"""{sensor.temperature=} {sensor.acceleration=} {sensor.gyro=}""")
|
||||
temp = sensor.temperature
|
||||
ax = sensor.acceleration[0]
|
||||
axp = int((ax * 100 + 100)/2)
|
||||
ay = sensor.acceleration[1]
|
||||
ayp = int((ay * 100 + 100)/2)
|
||||
az = sensor.acceleration[2]
|
||||
azp = int((az * 100 + 100)/2)
|
||||
# values between -200 and 200 => /4 becomes -50 and 50 => +50 becomes 0 and 100
|
||||
gx = map_nonlinear(sensor.gyro[0])
|
||||
gy = map_nonlinear(sensor.gyro[1])
|
||||
gz = map_nonlinear(sensor.gyro[2])
|
||||
else:
|
||||
temp = 12.34
|
||||
axp = 25
|
||||
ayp = 50
|
||||
azp = 75
|
||||
gx = 45
|
||||
gy = 50
|
||||
gz = 55
|
||||
templabel.set_text(f"IMU chip temperature: {temp:.2f}°C")
|
||||
sliderx.set_value(axp, lv.ANIM.OFF)
|
||||
slidery.set_value(ayp, lv.ANIM.OFF)
|
||||
sliderz.set_value(azp, lv.ANIM.OFF)
|
||||
slidergx.set_value(gx, lv.ANIM.OFF)
|
||||
slidergy.set_value(gy, lv.ANIM.OFF)
|
||||
slidergz.set_value(gz, lv.ANIM.OFF)
|
||||
|
||||
|
||||
def janitor_cb(timer):
|
||||
if lv.screen_active() != main_screen:
|
||||
print("imutest.py backgrounded, cleaning up...")
|
||||
janitor.delete()
|
||||
refresh_timer.delete()
|
||||
|
||||
have_imu = True
|
||||
try:
|
||||
from machine import Pin, I2C
|
||||
from qmi8658 import QMI8658
|
||||
import machine
|
||||
sensor = QMI8658(I2C(0, sda=machine.Pin(48), scl=machine.Pin(47)))
|
||||
except Exception as e:
|
||||
print(f"Warning: could not initialize IMU hardware: {e}")
|
||||
have_imu=False
|
||||
|
||||
|
||||
main_screen = lv.obj()
|
||||
templabel = lv.label(main_screen)
|
||||
templabel.align(lv.ALIGN.TOP_MID, 0, 10)
|
||||
sliderx = lv.slider(main_screen)
|
||||
sliderx.align(lv.ALIGN.CENTER, 0, -60)
|
||||
slidery = lv.slider(main_screen)
|
||||
slidery.align(lv.ALIGN.CENTER, 0, -30)
|
||||
sliderz = lv.slider(main_screen)
|
||||
sliderz.align(lv.ALIGN.CENTER, 0, 0)
|
||||
slidergx = lv.slider(main_screen)
|
||||
slidergx.align(lv.ALIGN.CENTER, 0, 30)
|
||||
slidergy = lv.slider(main_screen)
|
||||
slidergy.align(lv.ALIGN.CENTER, 0, 60)
|
||||
slidergz = lv.slider(main_screen)
|
||||
slidergz.align(lv.ALIGN.CENTER, 0, 90)
|
||||
mpos.ui.load_screen(main_screen)
|
||||
|
||||
refresh_timer = lv.timer_create(refresh, 100, None)
|
||||
janitor = lv.timer_create(janitor_cb, 500, None)
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 5.9 KiB |
@@ -1,13 +0,0 @@
|
||||
{
|
||||
"name": "Graphics",
|
||||
"publisher": "ACME Inc",
|
||||
"short_description": "Testing on-display spinner animations",
|
||||
"long_description": "Stress testing multiple concurrent animations on the display by adding more and more spinners",
|
||||
"icon_url": "http://demo.lnpiggy.com:2121/apps/com.example.lvgltest_0.0.1.mpk_icon_64x64.png",
|
||||
"download_url": "http://demo.lnpiggy.com:2121/apps/com.example.lvgltest_0.0.1.mpk",
|
||||
"fullname": "com.example.lvgltest",
|
||||
"version": "0.0.1",
|
||||
"entrypoint": "assets/lvgltest.py",
|
||||
"category": "benchmarking"
|
||||
}
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
import time
|
||||
import random
|
||||
|
||||
# Configuration
|
||||
SPINNER_SIZE = 40 # Size of each spinner in pixels
|
||||
|
||||
# Global variables
|
||||
spinner_count = 0
|
||||
metrics_label = None
|
||||
|
||||
def add_spinner(timer):
|
||||
global spinner_count, metrics_label
|
||||
try:
|
||||
x = random.randint(0, appscreen.get_width() - SPINNER_SIZE)
|
||||
y = random.randint(0, appscreen.get_height() - SPINNER_SIZE)
|
||||
spinner_count += 1
|
||||
print(f"Placing spinner {spinner_count} with size {SPINNER_SIZE} at {x},{y}")
|
||||
spinner = lv.spinner(appscreen)
|
||||
spinner.set_size(SPINNER_SIZE, SPINNER_SIZE)
|
||||
spinner.set_pos(x, y)
|
||||
except Exception as e:
|
||||
print(f"Failed to create spinner {spinner_count}: {e}")
|
||||
return
|
||||
|
||||
metrics_label.set_text(f"Spinners: {spinner_count}")
|
||||
print(f"Finished adding spinner {spinner_count}")
|
||||
|
||||
|
||||
def janitor_cb(timer):
|
||||
if lv.screen_active() != appscreen:
|
||||
print("lvgltest.py backgrounded, cleaning up...")
|
||||
janitor.delete()
|
||||
add_spinner_timer.delete()
|
||||
|
||||
appscreen = lv.screen_active()
|
||||
metrics_label = lv.label(appscreen)
|
||||
metrics_label.set_style_text_color(lv.color_white(), 0)
|
||||
metrics_label.set_style_bg_color(lv.color_black(), 0)
|
||||
metrics_label.set_style_bg_opa(lv.OPA.COVER, 0)
|
||||
metrics_label.set_pos(10, 10)
|
||||
metrics_label.set_text("Spinners: 0")
|
||||
|
||||
print("Starting LVGL spinner benchmark...")
|
||||
janitor = lv.timer_create(janitor_cb, 400, None)
|
||||
add_spinner_timer = lv.timer_create(add_spinner, 2000, None)
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 6.0 KiB |
@@ -39,12 +39,9 @@ class OSUpdate(Activity):
|
||||
|
||||
if not network_connected:
|
||||
self.status_label.set_text("Error: WiFi is not connected.")
|
||||
time.sleep(10)
|
||||
else:
|
||||
print("Showing update info...")
|
||||
self.show_update_info()
|
||||
|
||||
print("osupdate.py finished")
|
||||
|
||||
def onStop(self, screen):
|
||||
self.keep_running = False # this is checked by the update_with_lvgl thread
|
||||
|
||||
Reference in New Issue
Block a user