Add app: OSUpdate

This commit is contained in:
Thomas Farstrike
2025-04-27 12:27:48 +02:00
parent 3920ce1cf1
commit 764fd26871
8 changed files with 576 additions and 0 deletions
@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Name: OSUpdate
Start-Script: assets/osupdate.py
@@ -0,0 +1,62 @@
import lvgl as lv
import ota.update
from esp32 import Partition
import urequests
subwindow.clean()
canary = lv.obj(subwindow)
canary.add_flag(lv.obj.FLAG.HIDDEN)
import ota.status
ota.status.status()
current = Partition(Partition.RUNNING)
current
current.get_next_update()
# Initialize LVGL display (assuming setup is done)
label = lv.label(subwindow)
label.set_text("OTA Update: 0.00%")
label.align(lv.ALIGN.CENTER, 0, -30)
progress_bar = lv.bar(subwindow)
progress_bar.set_size(200, 20)
progress_bar.align(lv.ALIGN.BOTTOM_MID, 0, -50)
progress_bar.set_range(0, 100)
progress_bar.set_value(0, lv.ANIM.OFF)
# Custom OTA update with LVGL progress
def update_with_lvgl(url):
def progress_callback(percent):
print(f"OTA Update: {percent:.1f}%")
label.set_text(f"OTA Update: {percent:.2f}%") # Cloud upload symbol
progress_bar.set_value(int(percent), lv.ANIM.ON)
current = Partition(Partition.RUNNING)
next_partition = current.get_next_update()
response = urequests.get(url, stream=True)
total_size = int(response.headers.get('Content-Length', 0))
bytes_written = 0
chunk_size = 4096
i = 0
print(f"Starting OTA update of size: {total_size}")
while canary.is_valid():
chunk = response.raw.read(chunk_size)
if not chunk:
print("No chunk, breaking...")
break
if len(chunk) < chunk_size:
print(f"Padding chunk {i} from {len(chunk)} to {chunk_size} bytes")
chunk = chunk + b'\xFF' * (chunk_size - len(chunk))
print(f"Writing chunk {i}")
next_partition.writeblocks(i, chunk)
bytes_written += len(chunk)
i += 1
if total_size:
progress_callback(bytes_written / total_size * 100)
response.close()
next_partition.set_boot()
import machine
machine.reset()
# Start OTA update
update_with_lvgl("http://demo.lnpiggy.com:2121/ESP32_GENERIC_S3-SPIRAM_OCT_micropython.bin")
+2
View File
@@ -1,3 +1,5 @@
This /lib folder contains:
- https://github.com/echo-lalia/qmi8658-micropython/blob/main/qmi8685.py but given the correct name "qmi8658.py"
- traceback.mpy from https://github.com/micropython/micropython-lib
- https://github.com/glenn20/micropython-esp32-ota/ installed with import mip; mip.install('github:glenn20/micropython-esp32-ota/mip/ota')
@@ -0,0 +1,163 @@
# partition_writer module for MicroPython on ESP32
# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20
# Based on OTA class by Thorsten von Eicken (@tve):
# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py
import hashlib
import io
from micropython import const
IOCTL_BLOCK_COUNT: int = const(4) # type: ignore
IOCTL_BLOCK_SIZE: int = const(5) # type: ignore
IOCTL_BLOCK_ERASE: int = const(6) # type: ignore
# An IOBase compatible class to wrap access to an os.AbstractBlockdev() device
# such as a partition on the device flash. Writes must be aligned to block
# boundaries.
# https://docs.micropython.org/en/latest/library/os.html#block-device-interface
# Extend IOBase so we can wrap this with io.BufferedWriter in BlockdevWriter
class Blockdev(io.IOBase):
def __init__(self, device):
self.device = device
self.blocksize = int(device.ioctl(IOCTL_BLOCK_SIZE, None))
self.blockcount = int(device.ioctl(IOCTL_BLOCK_COUNT, None))
self.pos = 0 # Current position (bytes from beginning) of device
self.end = 0 # Current end of the data written to the device
# Data must be a multiple of blocksize unless it is the last write to the
# device. The next write after a partial block will raise ValueError.
def write(self, data: bytes | bytearray | memoryview) -> int:
block, remainder = divmod(self.pos, self.blocksize)
if remainder:
raise ValueError(f"Block {block} write not aligned at block boundary.")
data_len = len(data)
nblocks, remainder = divmod(data_len, self.blocksize)
mv = memoryview(data)
if nblocks: # Write whole blocks
self.device.writeblocks(block, mv[: nblocks * self.blocksize])
block += nblocks
if remainder: # Write left over data as a partial block
self.device.ioctl(IOCTL_BLOCK_ERASE, block) # Erase block first
self.device.writeblocks(block, mv[-remainder:], 0)
self.pos += data_len
self.end = self.pos # The "end" of the data written to the device
return data_len
# Read data from the block device.
def readinto(self, data: bytearray | memoryview):
size = min(len(data), self.end - self.pos)
block, remainder = divmod(self.pos, self.blocksize)
self.device.readblocks(block, memoryview(data)[:size], remainder)
self.pos += size
return size
# Set the current file position for reading or writing
def seek(self, offset: int, whence: int = 0):
start = [0, self.pos, self.end]
self.pos = start[whence] + offset
# Calculate the SHA256 sum of a file (has a readinto() method)
def sha_file(f, buffersize=4096) -> str:
mv = memoryview(bytearray(buffersize))
read_sha = hashlib.sha256()
while (n := f.readinto(mv)) > 0:
read_sha.update(mv[:n])
return read_sha.digest().hex()
# BlockdevWriter provides a convenient interface to writing images to any block
# device which implements the micropython os.AbstractBlockDev interface (eg.
# Partition on flash storage on ESP32).
# https://docs.micropython.org/en/latest/library/os.html#block-device-interface
# https://docs.micropython.org/en/latest/library/esp32.html#flash-partitions
class BlockDevWriter:
def __init__(
self,
device, # Block device to recieve the data (eg. esp32.Partition)
verify: bool = True, # Should we read back and verify data after writing
verbose: bool = True,
):
self.device = Blockdev(device)
self.writer = io.BufferedWriter(
self.device, self.device.blocksize # type: ignore
)
self._sha = hashlib.sha256()
self.verify = verify
self.verbose = verbose
self.sha: str = ""
self.length: int = 0
blocksize, blockcount = self.device.blocksize, self.device.blockcount
if self.verbose:
print(f"Device capacity: {blockcount} x {blocksize} byte blocks.")
def set_sha_length(self, sha: str, length: int):
self.sha = sha
self.length = length
blocksize, blockcount = self.device.blocksize, self.device.blockcount
if length > blocksize * blockcount:
raise ValueError(f"length ({length} bytes) is > size of partition.")
if self.verbose and length:
blocks, remainder = divmod(length, blocksize)
print(f"Writing {blocks} blocks + {remainder} bytes.")
def print_progress(self):
if self.verbose:
block, remainder = divmod(self.device.pos, self.device.blocksize)
print(f"\rBLOCK {block}", end="")
if remainder:
print(f" + {remainder} bytes")
# Append data to the block device
def write(self, data: bytearray | bytes | memoryview) -> int:
self._sha.update(data)
n = self.writer.write(data)
self.print_progress()
return n
# Append data from f (a stream object) to the block device
def write_from_stream(self, f: io.BufferedReader) -> int:
mv = memoryview(bytearray(self.device.blocksize))
tot = 0
while (n := f.readinto(mv)) != 0:
tot += self.write(mv[:n])
return tot
# Flush remaining data to the block device and confirm all checksums
# Raises:
# ValueError("SHA mismatch...") if SHA of received data != expected sha
# ValueError("SHA verify fail...") if verified SHA != written sha
def close(self) -> None:
self.writer.flush()
self.print_progress()
# Check the checksums (SHA256)
nbytes: int = self.device.end
if self.length and self.length != nbytes:
raise ValueError(f"Received {nbytes} bytes (expect {self.length}).")
write_sha = self._sha.digest().hex()
if not self.sha:
self.sha = write_sha
if self.sha != write_sha:
raise ValueError(f"SHA mismatch recv={write_sha} expect={self.sha}.")
if self.verify:
if self.verbose:
print("Verifying SHA of the written data...", end="")
self.device.seek(0) # Reset to start of partition
read_sha = sha_file(self.device, self.device.blocksize)
if read_sha != write_sha:
raise ValueError(f"SHA verify failed write={write_sha} read={read_sha}")
if self.verbose:
print("Passed.")
if self.verbose or not self.sha:
print(f"SHA256={self.sha}")
self.device.seek(0) # Reset to start of partition
def __enter__(self):
return self
def __exit__(self, e_t, e_v, e_tr):
if e_t is None:
self.close()
+27
View File
@@ -0,0 +1,27 @@
from esp32 import Partition
# Mark this boot as successful: prevent rollback to last image on next reboot.
# Raises OSError(-261) if bootloader is not OTA capable.
def cancel() -> None:
try:
Partition.mark_app_valid_cancel_rollback()
except OSError as e:
if e.args[0] == -261:
print(f"{__name__}.cancel(): The bootloader does not support OTA rollback.")
else:
raise e
# Force a rollback on the next reboot to the previously booted ota partition
def force() -> None:
from .status import force_rollback
force_rollback()
# Undo a previous force rollback: ie. boot off the current partition on next reboot
def cancel_force() -> None:
from .status import current_ota
current_ota.set_boot()
+164
View File
@@ -0,0 +1,164 @@
# esp32_ota module for MicroPython on ESP32
# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20
# Based on OTA class by Thorsten von Eicken (@tve):
# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py
import binascii
import struct
import sys
import time
import machine
from esp32 import Partition
from flashbdev import bdev
from micropython import const
OTA_UNSUPPORTED = const(-261)
ESP_ERR_OTA_VALIDATE_FAILED = const(-5379)
OTA_MIN: int = const(16) # type: ignore
OTA_MAX: int = const(32) # type: ignore
OTA_SIZE = 0x20 # The size of an OTA record in bytes (32 bytes)
OTA_BLOCKS = (0, 1) # The offsets of the OTA records in the otadata partition
OTA_FMT = b"<L20sLL" # The format for reading/writing binary OTA records
OTA_LABEL = b"\xff" * OTA_SIZE # The expected label field in the OTA record
OTA_CRC_INIT = 0xFFFFFFFF # The initial value for the CRC32 checksum
OTADATA_TYPE = (1, 0) # The type and subtype of the otadata partition
otastate = {
0: "NEW",
1: "PENDING",
2: "VALID",
3: "INVALID",
4: "ABORTED",
0xFFFFFFFF: "UNDEFINED",
}
otadata_part = p[0] if (p := Partition.find(*OTADATA_TYPE)) else None
current_ota = Partition(Partition.RUNNING) # Partition we booted from
next_ota = None # Partition for the next OTA update (if device is OTA enabled)
try:
if otadata_part: # Avoid IDF error messages by checking for otadata partition
next_ota = current_ota.get_next_update()
except OSError:
pass
# Return the partition we will boot from on next boot
def boot_ota() -> Partition: # Partition we will boot from on next boot
if next_ota: # Avoid IDF debug messages by checking for otadata partition
try:
return Partition(Partition.BOOT)
except OSError: # OTA support is not available, return current partition
pass
return Partition(Partition.RUNNING)
# Return True if the device is configured for OTA updates
def ready() -> bool:
return next_ota is not None
def partition_table() -> list[tuple[int, int, int, int, str, bool]]:
partitions = [p.info() for p in Partition.find(Partition.TYPE_APP)]
partitions.extend([p.info() for p in Partition.find(Partition.TYPE_DATA)])
partitions.sort(key=lambda i: i[2]) # Sort by address
return partitions
def partition_table_print() -> None:
ptype = {Partition.TYPE_APP: "app", Partition.TYPE_DATA: "data"}
subtype = [
{0: "factory"} | {i: f"ota_{i-OTA_MIN}" for i in range(OTA_MIN, OTA_MAX)},
{0: "ota", 1: "phy", 2: "nvs", 129: "fat"}, # DATA subtypes
]
print("Partition table:")
print("# Name Type SubType Offset Size (bytes)")
for p in partition_table():
print(
f" {p[4]:10s} {ptype[p[0]]:8s} {subtype[p[0]][p[1]]:8} "
+ f"{p[2]:#10x} {p[3]:#10x} {p[3]:10,}"
)
# Return a list of OTA partitions sorted by partition subtype number
def ota_partitions() -> list[Partition]:
partitions: list[Partition] = [
p
for p in Partition.find(Partition.TYPE_APP)
if OTA_MIN <= p.info()[1] < OTA_MAX
]
# Sort by the OTA partition subtype: ota_0 (16), ota_1 (17), ota_2 (18), ...
partitions.sort(key=lambda p: p.info()[1])
return partitions
# Print the status of the otadata partition
def otadata_check() -> None:
if not otadata_part:
return
valid_seq = 1
for i in (0, 1):
otadata_part.readblocks(i, (b := bytearray(OTA_SIZE)))
seq, _, state_num, crc = struct.unpack(OTA_FMT, b)
state = otastate[state_num]
is_valid = (
state == "VALID"
and binascii.crc32(struct.pack(b"<L", seq), OTA_CRC_INIT) == crc
)
if is_valid and seq > valid_seq:
valid_seq = seq
print(f"OTA record: state={state}, seq={seq}, crc={crc}, valid={is_valid}")
print(
f"OTA record is {state}."
+ (" Will be updated on next boot." if state == "VALID" else "")
)
p = ota_partitions()
print(f"Next boot is '{p[(valid_seq - 1) % len(p)].info()[4]}'.")
# Print a detailed summary of the OTA status of the device
def status() -> None:
upyversion, pname = sys.version.split(" ")[2], current_ota.info()[4]
print(f"Micropython {upyversion} has booted from partition '{pname}'.")
print(f"Will boot from partition '{boot_ota().info()[4]}' on next reboot.")
if not ota_partitions():
print("There are no OTA partitions available.")
elif not next_ota:
print("No spare OTA partition is available for update.")
else:
print(f"The next OTA partition for update is '{next_ota.info()[4]}'.")
print(f"The / filesystem is mounted from partition '{bdev.info()[4]}'.")
partition_table_print()
otadata_check()
# The functions below are used by `ota.rollback` and are here to make
# `ota.rollback` as lightweight as possible for the common use case:
# calling `ota.rollback.cancel()` on every boot.
# Reboot the device after the provided delay
def ota_reboot(delay=10) -> None:
for i in range(delay, 0, -1):
print(f"\rRebooting in {i:2} seconds (ctrl-C to cancel)", end="")
time.sleep(1)
print()
machine.reset() # Reboot into the new image
# Micropython does not support forcing an OTA rollback so we do it by hand:
# - find the previous ota partition, validate the image and set it bootable.
# Raises OSError(-5379) if validation of the boot image fails.
# Raises OSError(-261) if no OTA partitions are available.
def force_rollback(reboot=False) -> None:
partitions = ota_partitions()
for i, p in enumerate(partitions):
if p.info() == current_ota.info(): # Compare by partition offset
partitions[i - 1].set_boot() # Set the previous partition to be bootable
if reboot:
ota_reboot()
return
raise OSError(OTA_UNSUPPORTED)
+152
View File
@@ -0,0 +1,152 @@
# esp32_ota module for MicroPython on ESP32
# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20
# Inspired by OTA class by Thorsten von Eicken (@tve):
# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py
import gc
import io
from esp32 import Partition
from .blockdev_writer import BlockDevWriter
from .status import ota_reboot
# Micropython sockets don't have context manager methods. This wrapper provides
# those.
class SocketWrapper:
def __init__(self, f: io.BufferedReader):
self.f = f
def __enter__(self) -> io.BufferedReader:
return self.f
def __exit__(self, e_t, e_v, e_tr):
self.f.close()
# Open a file or a URL and return a File-like object for reading
def open_url(url_or_filename: str, **kw) -> io.BufferedReader:
if url_or_filename.split(":", 1)[0] in ("http", "https"):
import requests
r = requests.get(url_or_filename, **kw)
code: int = r.status_code
if code != 200:
r.close()
raise ValueError(f"HTTP Error: {code}")
return SocketWrapper(r.raw) # type: ignore
else:
return open(url_or_filename, "rb")
# OTA manages a MicroPython firmware update over-the-air. It checks that there
# are at least two "ota" "app" partitions in the partition table and writes new
# firmware into the partition that is not currently running. When the update is
# complete, it sets the new partition as the next one to boot. Set reboot=True
# to force a reset/restart, or call machine.reset() explicitly. Remember to call
# ota.rollback.cancel() after a successful reboot to the new image.
class OTA:
def __init__(self, verify=True, verbose=True, reboot=False, sha="", length=0):
self.reboot = reboot
self.verbose = verbose
# Get the next free OTA partition
# Raise OSError(ENOENT) if no OTA partition available
self.part = Partition(Partition.RUNNING).get_next_update()
if verbose:
name: str = self.part.info()[4]
print(f"Writing new micropython image to OTA partition '{name}'...")
self.writer = BlockDevWriter(self.part, verify, verbose)
if sha or length:
self.writer.set_sha_length(sha, length)
# Append the data to the OTA partition
def write(self, data: bytearray | bytes | memoryview) -> int:
return self.writer.write(data)
# Flush any buffered data to the ota partition and set it as the boot
# partition. If verify is True, will read back the written firmware data to
# check the sha256 of the written data. If reboot is True, will reboot the
# device after 10 seconds.
def close(self) -> None:
if self.writer is None:
return
self.writer.close()
# Set as boot partition for next reboot
name: str = self.part.info()[4]
print(f"OTA Partition '{name}' updated successfully.")
self.part.set_boot() # Raise OSError(-5379) if image on part is not valid
bootname = Partition(Partition.BOOT).info()[4]
if name != bootname:
print(f"Warning: failed to set {name} as the next boot partition.")
print(f"Micropython will boot from '{bootname}' partition on next boot.")
print("Remember to call ota.rollback.cancel() after successful reboot.")
if self.reboot:
ota_reboot()
def __enter__(self):
return self
def __exit__(self, e_t, e_v, e_tr):
if e_t is None: # If exception is thrown, don't flush data or set bootable
self.close()
# Load a firmware file from the provided io stream
# - f: an io stream (supporting the f.readinto() method)
# - sha: (optional) the sha256sum of the firmware file
# - length: (optional) the length (in bytes) of the firmware file
def from_stream(self, f: io.BufferedReader, sha: str = "", length: int = 0) -> int:
if sha or length:
self.writer.set_sha_length(sha, length)
gc.collect()
return self.writer.write_from_stream(f)
# Write new firmware to the OTA partition from the given url
# - url: a filename or a http[s] url for the micropython.bin firmware.
# - sha: the sha256sum of the firmware file
# - length: the length (in bytes) of the firmware file
def from_firmware_file(self, url: str, sha: str = "", length: int = 0, **kw) -> int:
if self.verbose:
print(f"Opening firmware file {url}...")
with open_url(url, **kw) as f:
return self.from_stream(f, sha, length)
# Load a firmware file, the location of which is read from a json file
# containing the url for the firmware file, the sha and length of the file.
# - url: the name of a file or url containing the json.
# - kw: extra keywords arguments that will be passed to `requests.get()`
def from_json(self, url: str, **kw) -> int:
if not url.endswith(".json"):
raise ValueError("Url does not end with '.json'")
if self.verbose:
print(f"Opening json file {url}...")
with open_url(url, **kw) as f:
from json import load
data: dict = load(f)
try:
firmware: str = data["firmware"]
sha: str = data["sha"]
length: int = data["length"]
if not any(firmware.startswith(s) for s in ("https:", "http:", "/")):
# If firmware filename is relative, append to base of url of json file
baseurl, *_ = url.rsplit("/", 1)
firmware = f"{baseurl}/{firmware}"
return self.from_firmware_file(firmware, sha, length, **kw)
except KeyError as err:
print('OTA json must include "firmware", "sha" and "length" keys.')
raise err
# Convenience functions which use the OTA class to perform OTA updates.
def from_file(
url: str, sha="", length=0, verify=True, verbose=True, reboot=True, **kw
) -> None:
with OTA(verify, verbose, reboot) as ota_update:
ota_update.from_firmware_file(url, sha, length, **kw)
def from_json(url: str, verify=True, verbose=True, reboot=True, **kw) -> None:
with OTA(verify, verbose, reboot) as ota_update:
ota_update.from_json(url, **kw)
+3
View File
@@ -356,3 +356,6 @@ def run_launcher():
execute_script_new_thread("/autostart.py", True, False)
run_launcher()
# If we got this far without crashing, then no need to rollback the update
import ota.rollback
ota.rollback.cancel()