diff --git a/internal_filesystem/lib/README.md b/internal_filesystem/lib/README.md index a5d0eafc..b78ec741 100644 --- a/internal_filesystem/lib/README.md +++ b/internal_filesystem/lib/README.md @@ -1,7 +1,6 @@ This /lib folder contains: - https://github.com/echo-lalia/qmi8658-micropython/blob/main/qmi8685.py but given the correct name "qmi8658.py" - traceback.mpy from https://github.com/micropython/micropython-lib -- https://github.com/glenn20/micropython-esp32-ota/ installed with import mip; mip.install('github:glenn20/micropython-esp32-ota/mip/ota') - mip.install('github:jonnor/micropython-zipfile') - mip.install("shutil") for shutil.rmtree('/apps/com.example.files') # for rmtree() - mip.install("aiohttp") # easy websockets diff --git a/internal_filesystem/lib/mpos/main.py b/internal_filesystem/lib/mpos/main.py index 7cf69b51..a826e555 100644 --- a/internal_filesystem/lib/mpos/main.py +++ b/internal_filesystem/lib/mpos/main.py @@ -108,8 +108,8 @@ mpos.TaskManager.create_task(asyncio_repl()) # only gets started when mpos.TaskM async def ota_rollback_cancel(): try: - import ota.rollback - ota.rollback.cancel() + from esp32 import Partition + Partition.mark_app_valid_cancel_rollback() except Exception as e: print("main.py: warning: could not mark this update as valid:", e) diff --git a/internal_filesystem/lib/ota/blockdev_writer.py b/internal_filesystem/lib/ota/blockdev_writer.py deleted file mode 100644 index e0f98ce5..00000000 --- a/internal_filesystem/lib/ota/blockdev_writer.py +++ /dev/null @@ -1,163 +0,0 @@ -# partition_writer module for MicroPython on ESP32 -# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20 - -# Based on OTA class by Thorsten von Eicken (@tve): -# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py - -import hashlib -import io - -from micropython import const - -IOCTL_BLOCK_COUNT: int = const(4) # type: ignore -IOCTL_BLOCK_SIZE: int = const(5) # type: ignore -IOCTL_BLOCK_ERASE: int = const(6) # type: ignore - - -# An IOBase compatible class to wrap access to an os.AbstractBlockdev() device -# such as a partition on the device flash. Writes must be aligned to block -# boundaries. -# https://docs.micropython.org/en/latest/library/os.html#block-device-interface -# Extend IOBase so we can wrap this with io.BufferedWriter in BlockdevWriter -class Blockdev(io.IOBase): - def __init__(self, device): - self.device = device - self.blocksize = int(device.ioctl(IOCTL_BLOCK_SIZE, None)) - self.blockcount = int(device.ioctl(IOCTL_BLOCK_COUNT, None)) - self.pos = 0 # Current position (bytes from beginning) of device - self.end = 0 # Current end of the data written to the device - - # Data must be a multiple of blocksize unless it is the last write to the - # device. The next write after a partial block will raise ValueError. - def write(self, data: bytes | bytearray | memoryview) -> int: - block, remainder = divmod(self.pos, self.blocksize) - if remainder: - raise ValueError(f"Block {block} write not aligned at block boundary.") - data_len = len(data) - nblocks, remainder = divmod(data_len, self.blocksize) - mv = memoryview(data) - if nblocks: # Write whole blocks - self.device.writeblocks(block, mv[: nblocks * self.blocksize]) - block += nblocks - if remainder: # Write left over data as a partial block - self.device.ioctl(IOCTL_BLOCK_ERASE, block) # Erase block first - self.device.writeblocks(block, mv[-remainder:], 0) - self.pos += data_len - self.end = self.pos # The "end" of the data written to the device - return data_len - - # Read data from the block device. - def readinto(self, data: bytearray | memoryview): - size = min(len(data), self.end - self.pos) - block, remainder = divmod(self.pos, self.blocksize) - self.device.readblocks(block, memoryview(data)[:size], remainder) - self.pos += size - return size - - # Set the current file position for reading or writing - def seek(self, offset: int, whence: int = 0): - start = [0, self.pos, self.end] - self.pos = start[whence] + offset - - -# Calculate the SHA256 sum of a file (has a readinto() method) -def sha_file(f, buffersize=4096) -> str: - mv = memoryview(bytearray(buffersize)) - read_sha = hashlib.sha256() - while (n := f.readinto(mv)) > 0: - read_sha.update(mv[:n]) - return read_sha.digest().hex() - - -# BlockdevWriter provides a convenient interface to writing images to any block -# device which implements the micropython os.AbstractBlockDev interface (eg. -# Partition on flash storage on ESP32). -# https://docs.micropython.org/en/latest/library/os.html#block-device-interface -# https://docs.micropython.org/en/latest/library/esp32.html#flash-partitions -class BlockDevWriter: - def __init__( - self, - device, # Block device to recieve the data (eg. esp32.Partition) - verify: bool = True, # Should we read back and verify data after writing - verbose: bool = True, - ): - self.device = Blockdev(device) - self.writer = io.BufferedWriter( - self.device, self.device.blocksize # type: ignore - ) - self._sha = hashlib.sha256() - self.verify = verify - self.verbose = verbose - self.sha: str = "" - self.length: int = 0 - blocksize, blockcount = self.device.blocksize, self.device.blockcount - if self.verbose: - print(f"Device capacity: {blockcount} x {blocksize} byte blocks.") - - def set_sha_length(self, sha: str, length: int): - self.sha = sha - self.length = length - blocksize, blockcount = self.device.blocksize, self.device.blockcount - if length > blocksize * blockcount: - raise ValueError(f"length ({length} bytes) is > size of partition.") - if self.verbose and length: - blocks, remainder = divmod(length, blocksize) - print(f"Writing {blocks} blocks + {remainder} bytes.") - - def print_progress(self): - if self.verbose: - block, remainder = divmod(self.device.pos, self.device.blocksize) - print(f"\rBLOCK {block}", end="") - if remainder: - print(f" + {remainder} bytes") - - # Append data to the block device - def write(self, data: bytearray | bytes | memoryview) -> int: - self._sha.update(data) - n = self.writer.write(data) - self.print_progress() - return n - - # Append data from f (a stream object) to the block device - def write_from_stream(self, f: io.BufferedReader) -> int: - mv = memoryview(bytearray(self.device.blocksize)) - tot = 0 - while (n := f.readinto(mv)) != 0: - tot += self.write(mv[:n]) - return tot - - # Flush remaining data to the block device and confirm all checksums - # Raises: - # ValueError("SHA mismatch...") if SHA of received data != expected sha - # ValueError("SHA verify fail...") if verified SHA != written sha - def close(self) -> None: - self.writer.flush() - self.print_progress() - # Check the checksums (SHA256) - nbytes: int = self.device.end - if self.length and self.length != nbytes: - raise ValueError(f"Received {nbytes} bytes (expect {self.length}).") - write_sha = self._sha.digest().hex() - if not self.sha: - self.sha = write_sha - if self.sha != write_sha: - raise ValueError(f"SHA mismatch recv={write_sha} expect={self.sha}.") - if self.verify: - if self.verbose: - print("Verifying SHA of the written data...", end="") - self.device.seek(0) # Reset to start of partition - read_sha = sha_file(self.device, self.device.blocksize) - if read_sha != write_sha: - raise ValueError(f"SHA verify failed write={write_sha} read={read_sha}") - if self.verbose: - print("Passed.") - if self.verbose or not self.sha: - print(f"SHA256={self.sha}") - self.device.seek(0) # Reset to start of partition - - def __enter__(self): - return self - - def __exit__(self, e_t, e_v, e_tr): - if e_t is None: - self.close() diff --git a/internal_filesystem/lib/ota/rollback.py b/internal_filesystem/lib/ota/rollback.py deleted file mode 100644 index fc8667d9..00000000 --- a/internal_filesystem/lib/ota/rollback.py +++ /dev/null @@ -1,27 +0,0 @@ -from esp32 import Partition - - -# Mark this boot as successful: prevent rollback to last image on next reboot. -# Raises OSError(-261) if bootloader is not OTA capable. -def cancel() -> None: - try: - Partition.mark_app_valid_cancel_rollback() - except OSError as e: - if e.args[0] == -261: - print(f"{__name__}.cancel(): The bootloader does not support OTA rollback.") - else: - raise e - - -# Force a rollback on the next reboot to the previously booted ota partition -def force() -> None: - from .status import force_rollback - - force_rollback() - - -# Undo a previous force rollback: ie. boot off the current partition on next reboot -def cancel_force() -> None: - from .status import current_ota - - current_ota.set_boot() diff --git a/internal_filesystem/lib/ota/status.py b/internal_filesystem/lib/ota/status.py deleted file mode 100644 index 3c204db9..00000000 --- a/internal_filesystem/lib/ota/status.py +++ /dev/null @@ -1,164 +0,0 @@ -# esp32_ota module for MicroPython on ESP32 -# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20 - -# Based on OTA class by Thorsten von Eicken (@tve): -# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py - - -import binascii -import struct -import sys -import time - -import machine -from esp32 import Partition -from flashbdev import bdev -from micropython import const - -OTA_UNSUPPORTED = const(-261) -ESP_ERR_OTA_VALIDATE_FAILED = const(-5379) -OTA_MIN: int = const(16) # type: ignore -OTA_MAX: int = const(32) # type: ignore - -OTA_SIZE = 0x20 # The size of an OTA record in bytes (32 bytes) -OTA_BLOCKS = (0, 1) # The offsets of the OTA records in the otadata partition -OTA_FMT = b" Partition: # Partition we will boot from on next boot - if next_ota: # Avoid IDF debug messages by checking for otadata partition - try: - return Partition(Partition.BOOT) - except OSError: # OTA support is not available, return current partition - pass - return Partition(Partition.RUNNING) - - -# Return True if the device is configured for OTA updates -def ready() -> bool: - return next_ota is not None - - -def partition_table() -> list[tuple[int, int, int, int, str, bool]]: - partitions = [p.info() for p in Partition.find(Partition.TYPE_APP)] - partitions.extend([p.info() for p in Partition.find(Partition.TYPE_DATA)]) - partitions.sort(key=lambda i: i[2]) # Sort by address - return partitions - - -def partition_table_print() -> None: - ptype = {Partition.TYPE_APP: "app", Partition.TYPE_DATA: "data"} - subtype = [ - {0: "factory"} | {i: f"ota_{i-OTA_MIN}" for i in range(OTA_MIN, OTA_MAX)}, - {0: "ota", 1: "phy", 2: "nvs", 129: "fat"}, # DATA subtypes - ] - print("Partition table:") - print("# Name Type SubType Offset Size (bytes)") - for p in partition_table(): - print( - f" {p[4]:10s} {ptype[p[0]]:8s} {subtype[p[0]][p[1]]:8} " - + f"{p[2]:#10x} {p[3]:#10x} {p[3]:10,}" - ) - - -# Return a list of OTA partitions sorted by partition subtype number -def ota_partitions() -> list[Partition]: - partitions: list[Partition] = [ - p - for p in Partition.find(Partition.TYPE_APP) - if OTA_MIN <= p.info()[1] < OTA_MAX - ] - # Sort by the OTA partition subtype: ota_0 (16), ota_1 (17), ota_2 (18), ... - partitions.sort(key=lambda p: p.info()[1]) - return partitions - - -# Print the status of the otadata partition -def otadata_check() -> None: - if not otadata_part: - return - valid_seq = 1 - for i in (0, 1): - otadata_part.readblocks(i, (b := bytearray(OTA_SIZE))) - seq, _, state_num, crc = struct.unpack(OTA_FMT, b) - state = otastate[state_num] - is_valid = ( - state == "VALID" - and binascii.crc32(struct.pack(b" valid_seq: - valid_seq = seq - print(f"OTA record: state={state}, seq={seq}, crc={crc}, valid={is_valid}") - print( - f"OTA record is {state}." - + (" Will be updated on next boot." if state == "VALID" else "") - ) - p = ota_partitions() - print(f"Next boot is '{p[(valid_seq - 1) % len(p)].info()[4]}'.") - - -# Print a detailed summary of the OTA status of the device -def status() -> None: - upyversion, pname = sys.version.split(" ")[2], current_ota.info()[4] - print(f"Micropython {upyversion} has booted from partition '{pname}'.") - print(f"Will boot from partition '{boot_ota().info()[4]}' on next reboot.") - if not ota_partitions(): - print("There are no OTA partitions available.") - elif not next_ota: - print("No spare OTA partition is available for update.") - else: - print(f"The next OTA partition for update is '{next_ota.info()[4]}'.") - print(f"The / filesystem is mounted from partition '{bdev.info()[4]}'.") - partition_table_print() - otadata_check() - - -# The functions below are used by `ota.rollback` and are here to make -# `ota.rollback` as lightweight as possible for the common use case: -# calling `ota.rollback.cancel()` on every boot. - - -# Reboot the device after the provided delay -def ota_reboot(delay=10) -> None: - for i in range(delay, 0, -1): - print(f"\rRebooting in {i:2} seconds (ctrl-C to cancel)", end="") - time.sleep(1) - print() - machine.reset() # Reboot into the new image - - -# Micropython does not support forcing an OTA rollback so we do it by hand: -# - find the previous ota partition, validate the image and set it bootable. -# Raises OSError(-5379) if validation of the boot image fails. -# Raises OSError(-261) if no OTA partitions are available. -def force_rollback(reboot=False) -> None: - partitions = ota_partitions() - for i, p in enumerate(partitions): - if p.info() == current_ota.info(): # Compare by partition offset - partitions[i - 1].set_boot() # Set the previous partition to be bootable - if reboot: - ota_reboot() - return - raise OSError(OTA_UNSUPPORTED) diff --git a/internal_filesystem/lib/ota/update.py b/internal_filesystem/lib/ota/update.py deleted file mode 100644 index fbd760ae..00000000 --- a/internal_filesystem/lib/ota/update.py +++ /dev/null @@ -1,152 +0,0 @@ -# esp32_ota module for MicroPython on ESP32 -# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20 - -# Inspired by OTA class by Thorsten von Eicken (@tve): -# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py - -import gc -import io - -from esp32 import Partition - -from .blockdev_writer import BlockDevWriter -from .status import ota_reboot - - -# Micropython sockets don't have context manager methods. This wrapper provides -# those. -class SocketWrapper: - def __init__(self, f: io.BufferedReader): - self.f = f - - def __enter__(self) -> io.BufferedReader: - return self.f - - def __exit__(self, e_t, e_v, e_tr): - self.f.close() - - -# Open a file or a URL and return a File-like object for reading -def open_url(url_or_filename: str, **kw) -> io.BufferedReader: - if url_or_filename.split(":", 1)[0] in ("http", "https"): - import requests - - r = requests.get(url_or_filename, **kw) - code: int = r.status_code - if code != 200: - r.close() - raise ValueError(f"HTTP Error: {code}") - return SocketWrapper(r.raw) # type: ignore - else: - return open(url_or_filename, "rb") - - -# OTA manages a MicroPython firmware update over-the-air. It checks that there -# are at least two "ota" "app" partitions in the partition table and writes new -# firmware into the partition that is not currently running. When the update is -# complete, it sets the new partition as the next one to boot. Set reboot=True -# to force a reset/restart, or call machine.reset() explicitly. Remember to call -# ota.rollback.cancel() after a successful reboot to the new image. -class OTA: - def __init__(self, verify=True, verbose=True, reboot=False, sha="", length=0): - self.reboot = reboot - self.verbose = verbose - # Get the next free OTA partition - # Raise OSError(ENOENT) if no OTA partition available - self.part = Partition(Partition.RUNNING).get_next_update() - if verbose: - name: str = self.part.info()[4] - print(f"Writing new micropython image to OTA partition '{name}'...") - self.writer = BlockDevWriter(self.part, verify, verbose) - if sha or length: - self.writer.set_sha_length(sha, length) - - # Append the data to the OTA partition - def write(self, data: bytearray | bytes | memoryview) -> int: - return self.writer.write(data) - - # Flush any buffered data to the ota partition and set it as the boot - # partition. If verify is True, will read back the written firmware data to - # check the sha256 of the written data. If reboot is True, will reboot the - # device after 10 seconds. - def close(self) -> None: - if self.writer is None: - return - self.writer.close() - # Set as boot partition for next reboot - name: str = self.part.info()[4] - print(f"OTA Partition '{name}' updated successfully.") - self.part.set_boot() # Raise OSError(-5379) if image on part is not valid - bootname = Partition(Partition.BOOT).info()[4] - if name != bootname: - print(f"Warning: failed to set {name} as the next boot partition.") - print(f"Micropython will boot from '{bootname}' partition on next boot.") - print("Remember to call ota.rollback.cancel() after successful reboot.") - if self.reboot: - ota_reboot() - - def __enter__(self): - return self - - def __exit__(self, e_t, e_v, e_tr): - if e_t is None: # If exception is thrown, don't flush data or set bootable - self.close() - - # Load a firmware file from the provided io stream - # - f: an io stream (supporting the f.readinto() method) - # - sha: (optional) the sha256sum of the firmware file - # - length: (optional) the length (in bytes) of the firmware file - def from_stream(self, f: io.BufferedReader, sha: str = "", length: int = 0) -> int: - if sha or length: - self.writer.set_sha_length(sha, length) - gc.collect() - return self.writer.write_from_stream(f) - - # Write new firmware to the OTA partition from the given url - # - url: a filename or a http[s] url for the micropython.bin firmware. - # - sha: the sha256sum of the firmware file - # - length: the length (in bytes) of the firmware file - def from_firmware_file(self, url: str, sha: str = "", length: int = 0, **kw) -> int: - if self.verbose: - print(f"Opening firmware file {url}...") - with open_url(url, **kw) as f: - return self.from_stream(f, sha, length) - - # Load a firmware file, the location of which is read from a json file - # containing the url for the firmware file, the sha and length of the file. - # - url: the name of a file or url containing the json. - # - kw: extra keywords arguments that will be passed to `requests.get()` - def from_json(self, url: str, **kw) -> int: - if not url.endswith(".json"): - raise ValueError("Url does not end with '.json'") - if self.verbose: - print(f"Opening json file {url}...") - with open_url(url, **kw) as f: - from json import load - - data: dict = load(f) - try: - firmware: str = data["firmware"] - sha: str = data["sha"] - length: int = data["length"] - if not any(firmware.startswith(s) for s in ("https:", "http:", "/")): - # If firmware filename is relative, append to base of url of json file - baseurl, *_ = url.rsplit("/", 1) - firmware = f"{baseurl}/{firmware}" - return self.from_firmware_file(firmware, sha, length, **kw) - except KeyError as err: - print('OTA json must include "firmware", "sha" and "length" keys.') - raise err - - -# Convenience functions which use the OTA class to perform OTA updates. -def from_file( - url: str, sha="", length=0, verify=True, verbose=True, reboot=True, **kw -) -> None: - with OTA(verify, verbose, reboot) as ota_update: - ota_update.from_firmware_file(url, sha, length, **kw) - - -def from_json(url: str, verify=True, verbose=True, reboot=True, **kw) -> None: - with OTA(verify, verbose, reboot) as ota_update: - ota_update.from_json(url, **kw)