Fix canaries

This commit is contained in:
Thomas Farstrike
2025-04-28 14:27:40 +02:00
parent 8fc0399732
commit 3d3d8f4d71
7 changed files with 15 additions and 183 deletions
@@ -5,7 +5,7 @@
# Busy loop with yield: 40303.37 iterations/second
# SHA-256 (1KB): 7357.60 iterations/second
#
# Adding the canary.is_valid() test reduces the results to:
# Adding the appscreen == lv.screen_active() test reduces the results to:
# Busy loop: 22289 iterations/second
# Busy loop with yield: 15923 iterations/second
# SHA-256 (1KB): 5269 iterations/second
@@ -19,16 +19,12 @@ TEST_DURATION = 5 # Duration of each test in seconds
DATA_SIZE = 1024 # 1KB of data for SHA-256 test
DATA = os.urandom(DATA_SIZE) # Generate 1KB of random data for SHA-256
subwindow.clean()
canary = lv.obj(subwindow)
canary.add_flag(lv.obj.FLAG.HIDDEN)
def stress_test_busy_loop():
print("\nStarting busy loop stress test...")
iterations = 0
start_time = time.ticks_ms()
end_time = start_time + (TEST_DURATION * 1000)
while time.ticks_ms() < end_time and canary.is_valid():
while time.ticks_ms() < end_time and appscreen == lv.screen_active():
iterations += 1
duration_ms = time.ticks_diff(time.ticks_ms(), start_time)
iterations_per_second = (iterations / duration_ms) * 1000
@@ -41,7 +37,7 @@ def stress_test_busy_loop_with_yield():
iterations = 0
start_time = time.ticks_ms()
end_time = start_time + (TEST_DURATION * 1000)
while time.ticks_ms() < end_time and canary.is_valid():
while time.ticks_ms() < end_time and appscreen == lv.screen_active():
iterations += 1
time.sleep_ms(0) # Yield to other tasks
duration_ms = time.ticks_diff(time.ticks_ms(), start_time)
@@ -55,7 +51,7 @@ def stress_test_sha256():
iterations = 0
start_time = time.ticks_ms()
end_time = start_time + (TEST_DURATION * 1000)
while time.ticks_ms() < end_time and canary.is_valid():
while time.ticks_ms() < end_time and appscreen == lv.screen_active():
hashlib.sha256(DATA).digest() # Compute SHA-256 on 1KB data
iterations += 1
duration_ms = time.ticks_diff(time.ticks_ms(), start_time)
@@ -93,6 +89,6 @@ summary += f"SHA-256 (1KB): {sha256_ips:.2f}/second\n"
summary += "\nAll tests completed."
status.set_text(summary)
# Wait until the user stops the app
while canary.is_valid():
time.sleep_ms(100)
# Wait until the user closes the app
while appscreen == lv.screen_active():
time.sleep_ms(5000)
@@ -1,3 +0,0 @@
Manifest-Version: 1.0
Name: Files
Start-Script: assets/files.py
@@ -1,148 +0,0 @@
import lvgl as lv
import uos
import vfs
import machine
import time
# LVGL File System Driver for LittleFS
class LittleFSDriver:
def __init__(self, letter='L'):
self.letter = letter
self.files = {}
def init(self):
drv = lv.fs_drv_t()
lv.fs_drv_init(drv)
drv.letter = ord(self.letter)
drv.open_cb = self.open_cb
drv.close_cb = self.close_cb
drv.read_cb = self.read_cb
drv.write_cb = self.write_cb
drv.seek_cb = self.seek_cb
drv.tell_cb = self.tell_cb
drv.dir_open_cb = self.dir_open_cb
drv.dir_read_cb = self.dir_read_cb
drv.dir_close_cb = self.dir_close_cb
lv.fs_drv_register(drv)
def open_cb(self, drv, path, mode):
path = path.decode().lstrip(self.letter + ':')
try:
if mode & lv.FS_MODE_WR:
file = open(path, 'wb' if mode == lv.FS_MODE_WR else 'r+b')
else:
file = open(path, 'rb')
self.files[id(file)] = file
return id(file)
except Exception as e:
print(f"Open error: {e}")
return None
def close_cb(self, drv, file_p):
file_id = file_p
if file_id in self.files:
self.files[file_id].close()
del self.files[file_id]
return lv.FS_RES_OK
return lv.FS_RES_NOT_EX
def read_cb(self, drv, file_p, buf, btr, br):
file_id = file_p
if file_id in self.files:
try:
data = self.files[file_id].read(btr)
br[0] = len(data)
for i, b in enumerate(data):
buf[i] = b
return lv.FS_RES_OK
except:
return lv.FS_RES_FS_ERR
return lv.FS_RES_NOT_EX
def write_cb(self, drv, file_p, buf, btw, bw):
file_id = file_p
if file_id in self.files:
try:
data = bytes([buf[i] for i in range(btw)])
self.files[file_id].write(data)
bw[0] = btw
return lv.FS_RES_OK
except:
return lv.FS_RES_FS_ERR
return lv.FS_RES_NOT_EX
def seek_cb(self, drv, file_p, pos, whence):
file_id = file_p
if file_id in self.files:
try:
if whence == lv.FS_SEEK_SET:
self.files[file_id].seek(pos, 0)
elif whence == lv.FS_SEEK_CUR:
self.files[file_id].seek(pos, 1)
elif whence == lv.FS_SEEK_END:
self.files[file_id].seek(pos, 2)
return lv.FS_RES_OK
except:
return lv.FS_RES_FS_ERR
return lv.FS_RES_NOT_EX
def tell_cb(self, drv, file_p, pos):
file_id = file_p
if file_id in self.files:
try:
pos[0] = self.files[file_id].tell()
return lv.FS_RES_OK
except:
return lv.FS_RES_FS_ERR
return lv.FS_RES_NOT_EX
def dir_open_cb(self, drv, path):
path = path.decode().lstrip(self.letter + ':')
try:
dir_list = uos.listdir(path or '/')
dir_obj = {'path': path or '/', 'list': dir_list, 'index': 0}
dir_id = id(dir_obj)
self.files[dir_id] = dir_obj
return dir_id
except:
return None
def dir_read_cb(self, drv, dir_p, fn):
dir_id = dir_p
if dir_id in self.files:
dir_obj = self.files[dir_id]
if dir_obj['index'] < len(dir_obj['list']):
name = dir_obj['list'][dir_obj['index']]
try:
uos.stat(dir_obj['path'] + '/' + name + '/')
name = '/' + name
except:
pass
dir_obj['index'] += 1
fn.assign(name)
return lv.FS_RES_OK
fn.assign('')
return lv.FS_RES_OK
return lv.FS_RES_NOT_EX
def dir_close_cb(self, drv, dir_p):
dir_id = dir_p
if dir_id in self.files:
del self.files[dir_id]
return lv.FS_RES_OK
return lv.FS_RES_NOT_EX
# Register the LittleFS driver
fs_drv = LittleFSDriver('L')
fs_drv.init()
# Create subwindow
subwindow = lv.obj()
subwindow.set_size(lv.pct(100), lv.pct(100))
# Create File Explorer
def file_explorer_event_cb(e):
code = e.get_code()
obj = e.get_target()
if code == lv.EVENT.VALUE_CHANGED:
file_path = obj.get_selected_file_path()
print(f"Selected file: {file_path}")
subwindow.clean()
explorer = lv.file_explorer(subwindow)
explorer.set_size(lv.pct(100), lv.pct(100))
explorer.explorer_set_quick_access_path(lv.EXPLORER.HOME_DIR,"/")
explorer.explorer_open_dir("/apps")
#explorer.set_root_path("L:/")
explorer.add_event_cb(file_explorer_event_cb, lv.EVENT.VALUE_CHANGED, None)
@@ -3,16 +3,11 @@ import random
# Configuration
SPINNER_SIZE = 40 # Size of each spinner in pixels
UPDATE_INTERVAL_MS = 3000 # Add spinner and update metrics every 100ms
# Global variables
spinner_count = 0
metrics_label = None
subwindow.clean()
canary = lv.obj(subwindow)
canary.add_flag(lv.obj.FLAG.HIDDEN)
def add_spinner_and_update():
global spinner_count, metrics_label
try:
@@ -44,9 +39,9 @@ def run_benchmark():
metrics_label.set_pos(10, 10)
metrics_label.set_text("Spinners: 0")
while canary.is_valid():
while appscreen == lv.screen_active():
add_spinner_and_update()
time.sleep_ms(UPDATE_INTERVAL_MS)
time.sleep(3)
try:
@@ -71,10 +71,6 @@ def test_allocation(buffer_size, n):
# Test buffer sizes of 2^n, starting from n=1 (2 bytes)
n = 1
subwindow.clean()
canary = lv.obj(subwindow)
canary.add_flag(lv.obj.FLAG.HIDDEN)
status = lv.label(subwindow)
status.align(lv.ALIGN.TOP_LEFT, 5, 10)
status.set_style_text_color(lv.color_hex(0xFFFFFF), 0)
@@ -85,7 +81,7 @@ summary += "Buffer Size (bytes) | Max Allocated\n"
summary += "-----------------------------------\n"
status.set_text(summary)
while canary.is_valid():
while appscreen == lv.screen_active():
buffer_size = 2 ** n
summary += f"{buffer_size:>12} | "
status.set_text(summary)
@@ -112,5 +108,5 @@ summary += "Test completed.\n"
status.set_text(summary)
# Wait until the user stops the app
while canary.is_valid():
time.sleep_ms(100)
while appscreen == lv.screen_active():
time.sleep(1)
@@ -6,10 +6,6 @@ import urandom
import uhashlib
import time
subwindow.clean()
canary = lv.obj(subwindow)
canary.add_flag(lv.obj.FLAG.HIDDEN)
def compute_accept_key(key):
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
hash_obj = uhashlib.sha1(key + GUID)
@@ -201,7 +197,7 @@ try:
break
print("Subscribing to price updates...")
ws_send_text(sock, ujson.dumps({"type": "subscribe","product_ids": ["BTC-USD"],"channels": ["ticker_batch"]}))
while canary.is_valid():
while appscreen == lv.screen_active():
time.sleep(1)
data = ws_read_frame(sock)
if data:
+2 -2
View File
@@ -317,14 +317,14 @@ def execute_script(script_source, is_file, is_launcher, is_graphical):
tb = getattr(e, '__traceback__', None)
traceback.print_exception(type(e), e, tb)
print(f"Thread {thread_id}: script {compile_name} finished")
if is_graphical and not is_launcher:
if is_graphical and prevscreen and not is_launcher:
print("/main.py: execute_script(): deleting timers...")
timer1.delete()
timer2.delete()
timer3.delete()
timer4.delete()
newscreen.delete()
print(f"Thread {thread_id}: finished, prevscreen is set and it's not a launcher so returning to previous screen...")
print(f"Thread {thread_id}: finished. It's not a launcher so returning to previous screen...")
lv.screen_load(prevscreen)
except Exception as e:
print(f"Thread {thread_id}: error:")