You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Better canary test
This commit is contained in:
@@ -30,7 +30,6 @@
|
||||
#i2c.writeto_mem(sensor_addr, reg_addr, bytes([reg_value]))
|
||||
|
||||
|
||||
myscreen = lv.screen_active()
|
||||
|
||||
#subwindow.clean()
|
||||
#canary = lv.obj(subwindow)
|
||||
@@ -129,7 +128,7 @@ try_capture()
|
||||
|
||||
|
||||
import time
|
||||
while myscreen == lv.screen_active():
|
||||
while appscreen == lv.screen_active():
|
||||
try_capture()
|
||||
time.sleep_ms(100) # Allow for the MicroPython REPL to still work
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from machine import Pin, I2C
|
||||
from qmi8658 import QMI8658
|
||||
import time
|
||||
import machine
|
||||
|
||||
sensor = QMI8658(I2C(0, sda=machine.Pin(48), scl=machine.Pin(47)))
|
||||
@@ -35,11 +34,8 @@ def map_nonlinear(value: float) -> int:
|
||||
scaled = (sqrt_value / max_sqrt) * 50.0 # Scale to [0, 50]
|
||||
return int(50.0 + (sign * scaled)) # Shift to [0, 100]
|
||||
|
||||
|
||||
|
||||
canary = lv.obj(subwindow)
|
||||
canary.add_flag(lv.obj.FLAG.HIDDEN)
|
||||
while canary.is_valid():
|
||||
import time
|
||||
while appscreen == lv.screen_active():
|
||||
#print(f"""{sensor.temperature=} {sensor.acceleration=} {sensor.gyro=}""")
|
||||
templabel.set_text(f"Temperature: {sensor.temperature:.2f}")
|
||||
ax = sensor.acceleration[0]
|
||||
|
||||
@@ -2,12 +2,8 @@ import lvgl as lv
|
||||
import ota.update
|
||||
from esp32 import Partition
|
||||
import urequests
|
||||
|
||||
subwindow.clean()
|
||||
canary = lv.obj(subwindow)
|
||||
canary.add_flag(lv.obj.FLAG.HIDDEN)
|
||||
|
||||
import ota.status
|
||||
|
||||
ota.status.status()
|
||||
current = Partition(Partition.RUNNING)
|
||||
current
|
||||
@@ -38,7 +34,7 @@ def update_with_lvgl(url):
|
||||
chunk_size = 4096
|
||||
i = 0
|
||||
print(f"Starting OTA update of size: {total_size}")
|
||||
while canary.is_valid():
|
||||
while appscreen == lv.screen_active():
|
||||
chunk = response.raw.read(chunk_size)
|
||||
if not chunk:
|
||||
print("No chunk, breaking...")
|
||||
|
||||
@@ -275,6 +275,10 @@ def execute_script(script_source, is_file, is_launcher, is_graphical):
|
||||
thread_id = _thread.get_ident()
|
||||
print(f"Thread {thread_id}: executing script")
|
||||
try:
|
||||
if is_file:
|
||||
print(f"Thread {thread_id}: reading script from file {script_source}")
|
||||
with open(script_source, 'r') as f: # TODO: check if file exists first?
|
||||
script_source = f.read()
|
||||
if not is_graphical:
|
||||
script_globals = {
|
||||
'__name__': "__main__"
|
||||
@@ -296,15 +300,12 @@ def execute_script(script_source, is_file, is_launcher, is_graphical):
|
||||
lv.screen_load(newscreen)
|
||||
script_globals = {
|
||||
'lv': lv,
|
||||
'appscreen': newscreen,
|
||||
'subwindow': subwindow,
|
||||
'start_app': start_app, # for launcher apps
|
||||
'parse_manifest': parse_manifest, # for launcher apps
|
||||
'__name__': "__main__"
|
||||
}
|
||||
if is_file:
|
||||
print(f"Thread {thread_id}: reading script from file {script_source}")
|
||||
with open(script_source, 'r') as f: # TODO: check if file exists first?
|
||||
script_source = f.read()
|
||||
print(f"Thread {thread_id}: starting script")
|
||||
try:
|
||||
compile_name = 'script' if not is_file else long_path_to_filename(script_source) # Only filename, to avoid 'name too long' error
|
||||
|
||||
Reference in New Issue
Block a user