From 764fd268719bd7e2c1a129474968c0b7e3c7e093 Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Sun, 27 Apr 2025 12:27:48 +0200 Subject: [PATCH] Add app: OSUpdate --- .../com.example.osupdate/META-INF/MANIFEST.MF | 3 + .../com.example.osupdate/assets/osupdate.py | 62 +++++++ internal_filesystem/lib/README.md | 2 + .../lib/ota/blockdev_writer.py | 163 +++++++++++++++++ internal_filesystem/lib/ota/rollback.py | 27 +++ internal_filesystem/lib/ota/status.py | 164 ++++++++++++++++++ internal_filesystem/lib/ota/update.py | 152 ++++++++++++++++ internal_filesystem/main.py | 3 + 8 files changed, 576 insertions(+) create mode 100644 internal_filesystem/apps/com.example.osupdate/META-INF/MANIFEST.MF create mode 100644 internal_filesystem/apps/com.example.osupdate/assets/osupdate.py create mode 100644 internal_filesystem/lib/ota/blockdev_writer.py create mode 100644 internal_filesystem/lib/ota/rollback.py create mode 100644 internal_filesystem/lib/ota/status.py create mode 100644 internal_filesystem/lib/ota/update.py diff --git a/internal_filesystem/apps/com.example.osupdate/META-INF/MANIFEST.MF b/internal_filesystem/apps/com.example.osupdate/META-INF/MANIFEST.MF new file mode 100644 index 00000000..e2c48647 --- /dev/null +++ b/internal_filesystem/apps/com.example.osupdate/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Name: OSUpdate +Start-Script: assets/osupdate.py diff --git a/internal_filesystem/apps/com.example.osupdate/assets/osupdate.py b/internal_filesystem/apps/com.example.osupdate/assets/osupdate.py new file mode 100644 index 00000000..1db12e6b --- /dev/null +++ b/internal_filesystem/apps/com.example.osupdate/assets/osupdate.py @@ -0,0 +1,62 @@ +import lvgl as lv +import ota.update +from esp32 import Partition +import urequests + + +subwindow.clean() +canary = lv.obj(subwindow) +canary.add_flag(lv.obj.FLAG.HIDDEN) + + +import ota.status +ota.status.status() +current = Partition(Partition.RUNNING) +current +current.get_next_update() + +# Initialize LVGL display (assuming setup is done) +label = lv.label(subwindow) +label.set_text("OTA Update: 0.00%") +label.align(lv.ALIGN.CENTER, 0, -30) +progress_bar = lv.bar(subwindow) +progress_bar.set_size(200, 20) +progress_bar.align(lv.ALIGN.BOTTOM_MID, 0, -50) +progress_bar.set_range(0, 100) +progress_bar.set_value(0, lv.ANIM.OFF) + +# Custom OTA update with LVGL progress +def update_with_lvgl(url): + def progress_callback(percent): + print(f"OTA Update: {percent:.1f}%") + label.set_text(f"OTA Update: {percent:.2f}%") # Cloud upload symbol + progress_bar.set_value(int(percent), lv.ANIM.ON) + current = Partition(Partition.RUNNING) + next_partition = current.get_next_update() + response = urequests.get(url, stream=True) + total_size = int(response.headers.get('Content-Length', 0)) + bytes_written = 0 + chunk_size = 4096 + i = 0 + print(f"Starting OTA update of size: {total_size}") + while canary.is_valid(): + chunk = response.raw.read(chunk_size) + if not chunk: + print("No chunk, breaking...") + break + if len(chunk) < chunk_size: + print(f"Padding chunk {i} from {len(chunk)} to {chunk_size} bytes") + chunk = chunk + b'\xFF' * (chunk_size - len(chunk)) + print(f"Writing chunk {i}") + next_partition.writeblocks(i, chunk) + bytes_written += len(chunk) + i += 1 + if total_size: + progress_callback(bytes_written / total_size * 100) + response.close() + next_partition.set_boot() + import machine + machine.reset() + +# Start OTA update +update_with_lvgl("http://demo.lnpiggy.com:2121/ESP32_GENERIC_S3-SPIRAM_OCT_micropython.bin") diff --git a/internal_filesystem/lib/README.md b/internal_filesystem/lib/README.md index 6dfeef45..070e03d5 100644 --- a/internal_filesystem/lib/README.md +++ b/internal_filesystem/lib/README.md @@ -1,3 +1,5 @@ This /lib folder contains: - https://github.com/echo-lalia/qmi8658-micropython/blob/main/qmi8685.py but given the correct name "qmi8658.py" - traceback.mpy from https://github.com/micropython/micropython-lib +- https://github.com/glenn20/micropython-esp32-ota/ installed with import mip; mip.install('github:glenn20/micropython-esp32-ota/mip/ota') + diff --git a/internal_filesystem/lib/ota/blockdev_writer.py b/internal_filesystem/lib/ota/blockdev_writer.py new file mode 100644 index 00000000..e0f98ce5 --- /dev/null +++ b/internal_filesystem/lib/ota/blockdev_writer.py @@ -0,0 +1,163 @@ +# partition_writer module for MicroPython on ESP32 +# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20 + +# Based on OTA class by Thorsten von Eicken (@tve): +# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py + +import hashlib +import io + +from micropython import const + +IOCTL_BLOCK_COUNT: int = const(4) # type: ignore +IOCTL_BLOCK_SIZE: int = const(5) # type: ignore +IOCTL_BLOCK_ERASE: int = const(6) # type: ignore + + +# An IOBase compatible class to wrap access to an os.AbstractBlockdev() device +# such as a partition on the device flash. Writes must be aligned to block +# boundaries. +# https://docs.micropython.org/en/latest/library/os.html#block-device-interface +# Extend IOBase so we can wrap this with io.BufferedWriter in BlockdevWriter +class Blockdev(io.IOBase): + def __init__(self, device): + self.device = device + self.blocksize = int(device.ioctl(IOCTL_BLOCK_SIZE, None)) + self.blockcount = int(device.ioctl(IOCTL_BLOCK_COUNT, None)) + self.pos = 0 # Current position (bytes from beginning) of device + self.end = 0 # Current end of the data written to the device + + # Data must be a multiple of blocksize unless it is the last write to the + # device. The next write after a partial block will raise ValueError. + def write(self, data: bytes | bytearray | memoryview) -> int: + block, remainder = divmod(self.pos, self.blocksize) + if remainder: + raise ValueError(f"Block {block} write not aligned at block boundary.") + data_len = len(data) + nblocks, remainder = divmod(data_len, self.blocksize) + mv = memoryview(data) + if nblocks: # Write whole blocks + self.device.writeblocks(block, mv[: nblocks * self.blocksize]) + block += nblocks + if remainder: # Write left over data as a partial block + self.device.ioctl(IOCTL_BLOCK_ERASE, block) # Erase block first + self.device.writeblocks(block, mv[-remainder:], 0) + self.pos += data_len + self.end = self.pos # The "end" of the data written to the device + return data_len + + # Read data from the block device. + def readinto(self, data: bytearray | memoryview): + size = min(len(data), self.end - self.pos) + block, remainder = divmod(self.pos, self.blocksize) + self.device.readblocks(block, memoryview(data)[:size], remainder) + self.pos += size + return size + + # Set the current file position for reading or writing + def seek(self, offset: int, whence: int = 0): + start = [0, self.pos, self.end] + self.pos = start[whence] + offset + + +# Calculate the SHA256 sum of a file (has a readinto() method) +def sha_file(f, buffersize=4096) -> str: + mv = memoryview(bytearray(buffersize)) + read_sha = hashlib.sha256() + while (n := f.readinto(mv)) > 0: + read_sha.update(mv[:n]) + return read_sha.digest().hex() + + +# BlockdevWriter provides a convenient interface to writing images to any block +# device which implements the micropython os.AbstractBlockDev interface (eg. +# Partition on flash storage on ESP32). +# https://docs.micropython.org/en/latest/library/os.html#block-device-interface +# https://docs.micropython.org/en/latest/library/esp32.html#flash-partitions +class BlockDevWriter: + def __init__( + self, + device, # Block device to recieve the data (eg. esp32.Partition) + verify: bool = True, # Should we read back and verify data after writing + verbose: bool = True, + ): + self.device = Blockdev(device) + self.writer = io.BufferedWriter( + self.device, self.device.blocksize # type: ignore + ) + self._sha = hashlib.sha256() + self.verify = verify + self.verbose = verbose + self.sha: str = "" + self.length: int = 0 + blocksize, blockcount = self.device.blocksize, self.device.blockcount + if self.verbose: + print(f"Device capacity: {blockcount} x {blocksize} byte blocks.") + + def set_sha_length(self, sha: str, length: int): + self.sha = sha + self.length = length + blocksize, blockcount = self.device.blocksize, self.device.blockcount + if length > blocksize * blockcount: + raise ValueError(f"length ({length} bytes) is > size of partition.") + if self.verbose and length: + blocks, remainder = divmod(length, blocksize) + print(f"Writing {blocks} blocks + {remainder} bytes.") + + def print_progress(self): + if self.verbose: + block, remainder = divmod(self.device.pos, self.device.blocksize) + print(f"\rBLOCK {block}", end="") + if remainder: + print(f" + {remainder} bytes") + + # Append data to the block device + def write(self, data: bytearray | bytes | memoryview) -> int: + self._sha.update(data) + n = self.writer.write(data) + self.print_progress() + return n + + # Append data from f (a stream object) to the block device + def write_from_stream(self, f: io.BufferedReader) -> int: + mv = memoryview(bytearray(self.device.blocksize)) + tot = 0 + while (n := f.readinto(mv)) != 0: + tot += self.write(mv[:n]) + return tot + + # Flush remaining data to the block device and confirm all checksums + # Raises: + # ValueError("SHA mismatch...") if SHA of received data != expected sha + # ValueError("SHA verify fail...") if verified SHA != written sha + def close(self) -> None: + self.writer.flush() + self.print_progress() + # Check the checksums (SHA256) + nbytes: int = self.device.end + if self.length and self.length != nbytes: + raise ValueError(f"Received {nbytes} bytes (expect {self.length}).") + write_sha = self._sha.digest().hex() + if not self.sha: + self.sha = write_sha + if self.sha != write_sha: + raise ValueError(f"SHA mismatch recv={write_sha} expect={self.sha}.") + if self.verify: + if self.verbose: + print("Verifying SHA of the written data...", end="") + self.device.seek(0) # Reset to start of partition + read_sha = sha_file(self.device, self.device.blocksize) + if read_sha != write_sha: + raise ValueError(f"SHA verify failed write={write_sha} read={read_sha}") + if self.verbose: + print("Passed.") + if self.verbose or not self.sha: + print(f"SHA256={self.sha}") + self.device.seek(0) # Reset to start of partition + + def __enter__(self): + return self + + def __exit__(self, e_t, e_v, e_tr): + if e_t is None: + self.close() diff --git a/internal_filesystem/lib/ota/rollback.py b/internal_filesystem/lib/ota/rollback.py new file mode 100644 index 00000000..fc8667d9 --- /dev/null +++ b/internal_filesystem/lib/ota/rollback.py @@ -0,0 +1,27 @@ +from esp32 import Partition + + +# Mark this boot as successful: prevent rollback to last image on next reboot. +# Raises OSError(-261) if bootloader is not OTA capable. +def cancel() -> None: + try: + Partition.mark_app_valid_cancel_rollback() + except OSError as e: + if e.args[0] == -261: + print(f"{__name__}.cancel(): The bootloader does not support OTA rollback.") + else: + raise e + + +# Force a rollback on the next reboot to the previously booted ota partition +def force() -> None: + from .status import force_rollback + + force_rollback() + + +# Undo a previous force rollback: ie. boot off the current partition on next reboot +def cancel_force() -> None: + from .status import current_ota + + current_ota.set_boot() diff --git a/internal_filesystem/lib/ota/status.py b/internal_filesystem/lib/ota/status.py new file mode 100644 index 00000000..3c204db9 --- /dev/null +++ b/internal_filesystem/lib/ota/status.py @@ -0,0 +1,164 @@ +# esp32_ota module for MicroPython on ESP32 +# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20 + +# Based on OTA class by Thorsten von Eicken (@tve): +# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py + + +import binascii +import struct +import sys +import time + +import machine +from esp32 import Partition +from flashbdev import bdev +from micropython import const + +OTA_UNSUPPORTED = const(-261) +ESP_ERR_OTA_VALIDATE_FAILED = const(-5379) +OTA_MIN: int = const(16) # type: ignore +OTA_MAX: int = const(32) # type: ignore + +OTA_SIZE = 0x20 # The size of an OTA record in bytes (32 bytes) +OTA_BLOCKS = (0, 1) # The offsets of the OTA records in the otadata partition +OTA_FMT = b" Partition: # Partition we will boot from on next boot + if next_ota: # Avoid IDF debug messages by checking for otadata partition + try: + return Partition(Partition.BOOT) + except OSError: # OTA support is not available, return current partition + pass + return Partition(Partition.RUNNING) + + +# Return True if the device is configured for OTA updates +def ready() -> bool: + return next_ota is not None + + +def partition_table() -> list[tuple[int, int, int, int, str, bool]]: + partitions = [p.info() for p in Partition.find(Partition.TYPE_APP)] + partitions.extend([p.info() for p in Partition.find(Partition.TYPE_DATA)]) + partitions.sort(key=lambda i: i[2]) # Sort by address + return partitions + + +def partition_table_print() -> None: + ptype = {Partition.TYPE_APP: "app", Partition.TYPE_DATA: "data"} + subtype = [ + {0: "factory"} | {i: f"ota_{i-OTA_MIN}" for i in range(OTA_MIN, OTA_MAX)}, + {0: "ota", 1: "phy", 2: "nvs", 129: "fat"}, # DATA subtypes + ] + print("Partition table:") + print("# Name Type SubType Offset Size (bytes)") + for p in partition_table(): + print( + f" {p[4]:10s} {ptype[p[0]]:8s} {subtype[p[0]][p[1]]:8} " + + f"{p[2]:#10x} {p[3]:#10x} {p[3]:10,}" + ) + + +# Return a list of OTA partitions sorted by partition subtype number +def ota_partitions() -> list[Partition]: + partitions: list[Partition] = [ + p + for p in Partition.find(Partition.TYPE_APP) + if OTA_MIN <= p.info()[1] < OTA_MAX + ] + # Sort by the OTA partition subtype: ota_0 (16), ota_1 (17), ota_2 (18), ... + partitions.sort(key=lambda p: p.info()[1]) + return partitions + + +# Print the status of the otadata partition +def otadata_check() -> None: + if not otadata_part: + return + valid_seq = 1 + for i in (0, 1): + otadata_part.readblocks(i, (b := bytearray(OTA_SIZE))) + seq, _, state_num, crc = struct.unpack(OTA_FMT, b) + state = otastate[state_num] + is_valid = ( + state == "VALID" + and binascii.crc32(struct.pack(b" valid_seq: + valid_seq = seq + print(f"OTA record: state={state}, seq={seq}, crc={crc}, valid={is_valid}") + print( + f"OTA record is {state}." + + (" Will be updated on next boot." if state == "VALID" else "") + ) + p = ota_partitions() + print(f"Next boot is '{p[(valid_seq - 1) % len(p)].info()[4]}'.") + + +# Print a detailed summary of the OTA status of the device +def status() -> None: + upyversion, pname = sys.version.split(" ")[2], current_ota.info()[4] + print(f"Micropython {upyversion} has booted from partition '{pname}'.") + print(f"Will boot from partition '{boot_ota().info()[4]}' on next reboot.") + if not ota_partitions(): + print("There are no OTA partitions available.") + elif not next_ota: + print("No spare OTA partition is available for update.") + else: + print(f"The next OTA partition for update is '{next_ota.info()[4]}'.") + print(f"The / filesystem is mounted from partition '{bdev.info()[4]}'.") + partition_table_print() + otadata_check() + + +# The functions below are used by `ota.rollback` and are here to make +# `ota.rollback` as lightweight as possible for the common use case: +# calling `ota.rollback.cancel()` on every boot. + + +# Reboot the device after the provided delay +def ota_reboot(delay=10) -> None: + for i in range(delay, 0, -1): + print(f"\rRebooting in {i:2} seconds (ctrl-C to cancel)", end="") + time.sleep(1) + print() + machine.reset() # Reboot into the new image + + +# Micropython does not support forcing an OTA rollback so we do it by hand: +# - find the previous ota partition, validate the image and set it bootable. +# Raises OSError(-5379) if validation of the boot image fails. +# Raises OSError(-261) if no OTA partitions are available. +def force_rollback(reboot=False) -> None: + partitions = ota_partitions() + for i, p in enumerate(partitions): + if p.info() == current_ota.info(): # Compare by partition offset + partitions[i - 1].set_boot() # Set the previous partition to be bootable + if reboot: + ota_reboot() + return + raise OSError(OTA_UNSUPPORTED) diff --git a/internal_filesystem/lib/ota/update.py b/internal_filesystem/lib/ota/update.py new file mode 100644 index 00000000..fbd760ae --- /dev/null +++ b/internal_filesystem/lib/ota/update.py @@ -0,0 +1,152 @@ +# esp32_ota module for MicroPython on ESP32 +# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20 + +# Inspired by OTA class by Thorsten von Eicken (@tve): +# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py + +import gc +import io + +from esp32 import Partition + +from .blockdev_writer import BlockDevWriter +from .status import ota_reboot + + +# Micropython sockets don't have context manager methods. This wrapper provides +# those. +class SocketWrapper: + def __init__(self, f: io.BufferedReader): + self.f = f + + def __enter__(self) -> io.BufferedReader: + return self.f + + def __exit__(self, e_t, e_v, e_tr): + self.f.close() + + +# Open a file or a URL and return a File-like object for reading +def open_url(url_or_filename: str, **kw) -> io.BufferedReader: + if url_or_filename.split(":", 1)[0] in ("http", "https"): + import requests + + r = requests.get(url_or_filename, **kw) + code: int = r.status_code + if code != 200: + r.close() + raise ValueError(f"HTTP Error: {code}") + return SocketWrapper(r.raw) # type: ignore + else: + return open(url_or_filename, "rb") + + +# OTA manages a MicroPython firmware update over-the-air. It checks that there +# are at least two "ota" "app" partitions in the partition table and writes new +# firmware into the partition that is not currently running. When the update is +# complete, it sets the new partition as the next one to boot. Set reboot=True +# to force a reset/restart, or call machine.reset() explicitly. Remember to call +# ota.rollback.cancel() after a successful reboot to the new image. +class OTA: + def __init__(self, verify=True, verbose=True, reboot=False, sha="", length=0): + self.reboot = reboot + self.verbose = verbose + # Get the next free OTA partition + # Raise OSError(ENOENT) if no OTA partition available + self.part = Partition(Partition.RUNNING).get_next_update() + if verbose: + name: str = self.part.info()[4] + print(f"Writing new micropython image to OTA partition '{name}'...") + self.writer = BlockDevWriter(self.part, verify, verbose) + if sha or length: + self.writer.set_sha_length(sha, length) + + # Append the data to the OTA partition + def write(self, data: bytearray | bytes | memoryview) -> int: + return self.writer.write(data) + + # Flush any buffered data to the ota partition and set it as the boot + # partition. If verify is True, will read back the written firmware data to + # check the sha256 of the written data. If reboot is True, will reboot the + # device after 10 seconds. + def close(self) -> None: + if self.writer is None: + return + self.writer.close() + # Set as boot partition for next reboot + name: str = self.part.info()[4] + print(f"OTA Partition '{name}' updated successfully.") + self.part.set_boot() # Raise OSError(-5379) if image on part is not valid + bootname = Partition(Partition.BOOT).info()[4] + if name != bootname: + print(f"Warning: failed to set {name} as the next boot partition.") + print(f"Micropython will boot from '{bootname}' partition on next boot.") + print("Remember to call ota.rollback.cancel() after successful reboot.") + if self.reboot: + ota_reboot() + + def __enter__(self): + return self + + def __exit__(self, e_t, e_v, e_tr): + if e_t is None: # If exception is thrown, don't flush data or set bootable + self.close() + + # Load a firmware file from the provided io stream + # - f: an io stream (supporting the f.readinto() method) + # - sha: (optional) the sha256sum of the firmware file + # - length: (optional) the length (in bytes) of the firmware file + def from_stream(self, f: io.BufferedReader, sha: str = "", length: int = 0) -> int: + if sha or length: + self.writer.set_sha_length(sha, length) + gc.collect() + return self.writer.write_from_stream(f) + + # Write new firmware to the OTA partition from the given url + # - url: a filename or a http[s] url for the micropython.bin firmware. + # - sha: the sha256sum of the firmware file + # - length: the length (in bytes) of the firmware file + def from_firmware_file(self, url: str, sha: str = "", length: int = 0, **kw) -> int: + if self.verbose: + print(f"Opening firmware file {url}...") + with open_url(url, **kw) as f: + return self.from_stream(f, sha, length) + + # Load a firmware file, the location of which is read from a json file + # containing the url for the firmware file, the sha and length of the file. + # - url: the name of a file or url containing the json. + # - kw: extra keywords arguments that will be passed to `requests.get()` + def from_json(self, url: str, **kw) -> int: + if not url.endswith(".json"): + raise ValueError("Url does not end with '.json'") + if self.verbose: + print(f"Opening json file {url}...") + with open_url(url, **kw) as f: + from json import load + + data: dict = load(f) + try: + firmware: str = data["firmware"] + sha: str = data["sha"] + length: int = data["length"] + if not any(firmware.startswith(s) for s in ("https:", "http:", "/")): + # If firmware filename is relative, append to base of url of json file + baseurl, *_ = url.rsplit("/", 1) + firmware = f"{baseurl}/{firmware}" + return self.from_firmware_file(firmware, sha, length, **kw) + except KeyError as err: + print('OTA json must include "firmware", "sha" and "length" keys.') + raise err + + +# Convenience functions which use the OTA class to perform OTA updates. +def from_file( + url: str, sha="", length=0, verify=True, verbose=True, reboot=True, **kw +) -> None: + with OTA(verify, verbose, reboot) as ota_update: + ota_update.from_firmware_file(url, sha, length, **kw) + + +def from_json(url: str, verify=True, verbose=True, reboot=True, **kw) -> None: + with OTA(verify, verbose, reboot) as ota_update: + ota_update.from_json(url, **kw) diff --git a/internal_filesystem/main.py b/internal_filesystem/main.py index 395b0ac5..3fec7e66 100644 --- a/internal_filesystem/main.py +++ b/internal_filesystem/main.py @@ -356,3 +356,6 @@ def run_launcher(): execute_script_new_thread("/autostart.py", True, False) run_launcher() +# If we got this far without crashing, then no need to rollback the update +import ota.rollback +ota.rollback.cancel()