Fix OSUpdate

This commit is contained in:
Thomas Farstrike
2025-06-06 22:36:57 +02:00
parent 0b07e08c45
commit 1417feb850
@@ -11,9 +11,13 @@ import mpos.ui
class OSUpdate(Activity):
keep_running = True
# Widgets:
install_button = None
main_screen = None
keep_running = True
progress_label = None
progress_bar = None
def onCreate(self):
self.main_screen = lv.obj()
@@ -88,43 +92,51 @@ class OSUpdate(Activity):
def install_button_click(self, download_url):
print(f"install_button_click for url {download_url}")
self.install_button.add_state(lv.STATE.DISABLED) # button will be enabled if there is an update available
self.status_label.set_text("Update in progress.\nNavigate away to cancel.")
self.progress_label = lv.label(self.main_screen)
self.progress_label.set_text("OS Update: 0.00%")
self.progress_label.align(lv.ALIGN.CENTER, 0, 0)
self.progress_bar = lv.bar(self.main_screen)
self.progress_bar.set_size(200, 20)
self.progress_bar.align(lv.ALIGN.BOTTOM_MID, 0, -50)
self.progress_bar.set_range(0, 100)
self.progress_bar.set_value(0, lv.ANIM.OFF)
try:
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(self.update_with_lvgl, (download_url,))
except Exception as e:
print("Could not start update_with_lvgl thread: ", e)
def progress_callback(self, percent):
print(f"OTA Update: {percent:.1f}%")
lv.async_call(lambda l: self.progress_label.set_text(f"OTA Update: {percent:.2f}%"), None)
lv.async_call(lambda l: self.progress_bar.set_value(int(percent), lv.ANIM.ON), None)
time.sleep_ms(100)
# Custom OTA update with LVGL progress
def update_with_lvgl(self, url):
self.install_button.add_flag(lv.obj.FLAG.HIDDEN) # or change to cancel button?
self.status_label.set_text("Update in progress.\nNavigate away to cancel.")
import ota.update
import ota.status
ota.status.status()
from esp32 import Partition
current_partition = Partition(Partition.RUNNING)
print(f"Current partition: {current_partition}")
next_partition = current_partition.get_next_update()
print(f"Next partition: {next_partition}")
label = lv.label(self.main_screen)
label.set_text("OS Update: 0.00%")
label.align(lv.ALIGN.CENTER, 0, -30)
progress_bar = lv.bar(self.main_screen)
progress_bar.set_size(200, 20)
progress_bar.align(lv.ALIGN.BOTTOM_MID, 0, -50)
progress_bar.set_range(0, 100)
progress_bar.set_value(0, lv.ANIM.OFF)
def progress_callback(percent):
print(f"OTA Update: {percent:.1f}%")
label.set_text(f"OTA Update: {percent:.2f}%")
progress_bar.set_value(int(percent), lv.ANIM.ON)
current = Partition(Partition.RUNNING)
next_partition = current.get_next_update()
simulate = False
try:
from esp32 import Partition
#current_partition = Partition(Partition.RUNNING)
#print(f"Current partition: {current_partition}")
#next_partition = current_partition.get_next_update()
#print(f"Next partition: {next_partition}")
current = Partition(Partition.RUNNING)
next_partition = current.get_next_update()
#import ota.update
#import ota.status
#ota.status.status()
except Exception as e:
print("Warning: could not import esp32.Partition, simulating update...")
simulate = True
response = requests.get(url, stream=True)
total_size = int(response.headers.get('Content-Length', 0))
bytes_written = 0
chunk_size = 4096
i = 0
total_size = round_up_to_multiple(total_size, chunk_size)
print(f"Starting OTA update of size: {total_size}")
while self.keep_running: # stop if the user navigates away
time.sleep_ms(100) # don't hog the CPU
@@ -135,21 +147,26 @@ class OSUpdate(Activity):
if len(chunk) < chunk_size:
print(f"Padding chunk {i} from {len(chunk)} to {chunk_size} bytes")
chunk = chunk + b'\xFF' * (chunk_size - len(chunk))
print(f"Writing chunk {i}")
next_partition.writeblocks(i, chunk)
print(f"Writing chunk {i} with length {len(chunk)}")
if not simulate:
next_partition.writeblocks(i, chunk)
bytes_written += len(chunk)
i += 1
if total_size:
progress_callback(bytes_written / total_size * 100)
self.progress_callback(bytes_written / total_size * 100)
response.close()
if bytes_written >= total_size: # if the update was completely installed
if bytes_written >= total_size and not simulate: # if the update was completely installed
next_partition.set_boot()
import machine
machine.reset()
# In case it didn't reset:
lv.async_call(lambda l: self.status_label.set_text("Update finished! Please restart."), None)
# Non-class functions:
def round_up_to_multiple(n, multiple):
return ((n + multiple - 1) // multiple) * multiple
def compare_versions(ver1: str, ver2: str) -> bool:
"""Compare two version numbers (e.g., '1.2.3' vs '4.5.6').
Returns True if ver1 is greater than ver2, False otherwise."""