You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Remove ota library
It was hardly used.
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
This /lib folder contains:
|
||||
- https://github.com/echo-lalia/qmi8658-micropython/blob/main/qmi8685.py but given the correct name "qmi8658.py"
|
||||
- traceback.mpy from https://github.com/micropython/micropython-lib
|
||||
- https://github.com/glenn20/micropython-esp32-ota/ installed with import mip; mip.install('github:glenn20/micropython-esp32-ota/mip/ota')
|
||||
- mip.install('github:jonnor/micropython-zipfile')
|
||||
- mip.install("shutil") for shutil.rmtree('/apps/com.example.files') # for rmtree()
|
||||
- mip.install("aiohttp") # easy websockets
|
||||
|
||||
@@ -108,8 +108,8 @@ mpos.TaskManager.create_task(asyncio_repl()) # only gets started when mpos.TaskM
|
||||
|
||||
async def ota_rollback_cancel():
|
||||
try:
|
||||
import ota.rollback
|
||||
ota.rollback.cancel()
|
||||
from esp32 import Partition
|
||||
Partition.mark_app_valid_cancel_rollback()
|
||||
except Exception as e:
|
||||
print("main.py: warning: could not mark this update as valid:", e)
|
||||
|
||||
|
||||
@@ -1,163 +0,0 @@
|
||||
# partition_writer module for MicroPython on ESP32
|
||||
# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20
|
||||
|
||||
# Based on OTA class by Thorsten von Eicken (@tve):
|
||||
# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py
|
||||
|
||||
import hashlib
|
||||
import io
|
||||
|
||||
from micropython import const
|
||||
|
||||
IOCTL_BLOCK_COUNT: int = const(4) # type: ignore
|
||||
IOCTL_BLOCK_SIZE: int = const(5) # type: ignore
|
||||
IOCTL_BLOCK_ERASE: int = const(6) # type: ignore
|
||||
|
||||
|
||||
# An IOBase compatible class to wrap access to an os.AbstractBlockdev() device
|
||||
# such as a partition on the device flash. Writes must be aligned to block
|
||||
# boundaries.
|
||||
# https://docs.micropython.org/en/latest/library/os.html#block-device-interface
|
||||
# Extend IOBase so we can wrap this with io.BufferedWriter in BlockdevWriter
|
||||
class Blockdev(io.IOBase):
|
||||
def __init__(self, device):
|
||||
self.device = device
|
||||
self.blocksize = int(device.ioctl(IOCTL_BLOCK_SIZE, None))
|
||||
self.blockcount = int(device.ioctl(IOCTL_BLOCK_COUNT, None))
|
||||
self.pos = 0 # Current position (bytes from beginning) of device
|
||||
self.end = 0 # Current end of the data written to the device
|
||||
|
||||
# Data must be a multiple of blocksize unless it is the last write to the
|
||||
# device. The next write after a partial block will raise ValueError.
|
||||
def write(self, data: bytes | bytearray | memoryview) -> int:
|
||||
block, remainder = divmod(self.pos, self.blocksize)
|
||||
if remainder:
|
||||
raise ValueError(f"Block {block} write not aligned at block boundary.")
|
||||
data_len = len(data)
|
||||
nblocks, remainder = divmod(data_len, self.blocksize)
|
||||
mv = memoryview(data)
|
||||
if nblocks: # Write whole blocks
|
||||
self.device.writeblocks(block, mv[: nblocks * self.blocksize])
|
||||
block += nblocks
|
||||
if remainder: # Write left over data as a partial block
|
||||
self.device.ioctl(IOCTL_BLOCK_ERASE, block) # Erase block first
|
||||
self.device.writeblocks(block, mv[-remainder:], 0)
|
||||
self.pos += data_len
|
||||
self.end = self.pos # The "end" of the data written to the device
|
||||
return data_len
|
||||
|
||||
# Read data from the block device.
|
||||
def readinto(self, data: bytearray | memoryview):
|
||||
size = min(len(data), self.end - self.pos)
|
||||
block, remainder = divmod(self.pos, self.blocksize)
|
||||
self.device.readblocks(block, memoryview(data)[:size], remainder)
|
||||
self.pos += size
|
||||
return size
|
||||
|
||||
# Set the current file position for reading or writing
|
||||
def seek(self, offset: int, whence: int = 0):
|
||||
start = [0, self.pos, self.end]
|
||||
self.pos = start[whence] + offset
|
||||
|
||||
|
||||
# Calculate the SHA256 sum of a file (has a readinto() method)
|
||||
def sha_file(f, buffersize=4096) -> str:
|
||||
mv = memoryview(bytearray(buffersize))
|
||||
read_sha = hashlib.sha256()
|
||||
while (n := f.readinto(mv)) > 0:
|
||||
read_sha.update(mv[:n])
|
||||
return read_sha.digest().hex()
|
||||
|
||||
|
||||
# BlockdevWriter provides a convenient interface to writing images to any block
|
||||
# device which implements the micropython os.AbstractBlockDev interface (eg.
|
||||
# Partition on flash storage on ESP32).
|
||||
# https://docs.micropython.org/en/latest/library/os.html#block-device-interface
|
||||
# https://docs.micropython.org/en/latest/library/esp32.html#flash-partitions
|
||||
class BlockDevWriter:
|
||||
def __init__(
|
||||
self,
|
||||
device, # Block device to recieve the data (eg. esp32.Partition)
|
||||
verify: bool = True, # Should we read back and verify data after writing
|
||||
verbose: bool = True,
|
||||
):
|
||||
self.device = Blockdev(device)
|
||||
self.writer = io.BufferedWriter(
|
||||
self.device, self.device.blocksize # type: ignore
|
||||
)
|
||||
self._sha = hashlib.sha256()
|
||||
self.verify = verify
|
||||
self.verbose = verbose
|
||||
self.sha: str = ""
|
||||
self.length: int = 0
|
||||
blocksize, blockcount = self.device.blocksize, self.device.blockcount
|
||||
if self.verbose:
|
||||
print(f"Device capacity: {blockcount} x {blocksize} byte blocks.")
|
||||
|
||||
def set_sha_length(self, sha: str, length: int):
|
||||
self.sha = sha
|
||||
self.length = length
|
||||
blocksize, blockcount = self.device.blocksize, self.device.blockcount
|
||||
if length > blocksize * blockcount:
|
||||
raise ValueError(f"length ({length} bytes) is > size of partition.")
|
||||
if self.verbose and length:
|
||||
blocks, remainder = divmod(length, blocksize)
|
||||
print(f"Writing {blocks} blocks + {remainder} bytes.")
|
||||
|
||||
def print_progress(self):
|
||||
if self.verbose:
|
||||
block, remainder = divmod(self.device.pos, self.device.blocksize)
|
||||
print(f"\rBLOCK {block}", end="")
|
||||
if remainder:
|
||||
print(f" + {remainder} bytes")
|
||||
|
||||
# Append data to the block device
|
||||
def write(self, data: bytearray | bytes | memoryview) -> int:
|
||||
self._sha.update(data)
|
||||
n = self.writer.write(data)
|
||||
self.print_progress()
|
||||
return n
|
||||
|
||||
# Append data from f (a stream object) to the block device
|
||||
def write_from_stream(self, f: io.BufferedReader) -> int:
|
||||
mv = memoryview(bytearray(self.device.blocksize))
|
||||
tot = 0
|
||||
while (n := f.readinto(mv)) != 0:
|
||||
tot += self.write(mv[:n])
|
||||
return tot
|
||||
|
||||
# Flush remaining data to the block device and confirm all checksums
|
||||
# Raises:
|
||||
# ValueError("SHA mismatch...") if SHA of received data != expected sha
|
||||
# ValueError("SHA verify fail...") if verified SHA != written sha
|
||||
def close(self) -> None:
|
||||
self.writer.flush()
|
||||
self.print_progress()
|
||||
# Check the checksums (SHA256)
|
||||
nbytes: int = self.device.end
|
||||
if self.length and self.length != nbytes:
|
||||
raise ValueError(f"Received {nbytes} bytes (expect {self.length}).")
|
||||
write_sha = self._sha.digest().hex()
|
||||
if not self.sha:
|
||||
self.sha = write_sha
|
||||
if self.sha != write_sha:
|
||||
raise ValueError(f"SHA mismatch recv={write_sha} expect={self.sha}.")
|
||||
if self.verify:
|
||||
if self.verbose:
|
||||
print("Verifying SHA of the written data...", end="")
|
||||
self.device.seek(0) # Reset to start of partition
|
||||
read_sha = sha_file(self.device, self.device.blocksize)
|
||||
if read_sha != write_sha:
|
||||
raise ValueError(f"SHA verify failed write={write_sha} read={read_sha}")
|
||||
if self.verbose:
|
||||
print("Passed.")
|
||||
if self.verbose or not self.sha:
|
||||
print(f"SHA256={self.sha}")
|
||||
self.device.seek(0) # Reset to start of partition
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, e_t, e_v, e_tr):
|
||||
if e_t is None:
|
||||
self.close()
|
||||
@@ -1,27 +0,0 @@
|
||||
from esp32 import Partition
|
||||
|
||||
|
||||
# Mark this boot as successful: prevent rollback to last image on next reboot.
|
||||
# Raises OSError(-261) if bootloader is not OTA capable.
|
||||
def cancel() -> None:
|
||||
try:
|
||||
Partition.mark_app_valid_cancel_rollback()
|
||||
except OSError as e:
|
||||
if e.args[0] == -261:
|
||||
print(f"{__name__}.cancel(): The bootloader does not support OTA rollback.")
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
||||
# Force a rollback on the next reboot to the previously booted ota partition
|
||||
def force() -> None:
|
||||
from .status import force_rollback
|
||||
|
||||
force_rollback()
|
||||
|
||||
|
||||
# Undo a previous force rollback: ie. boot off the current partition on next reboot
|
||||
def cancel_force() -> None:
|
||||
from .status import current_ota
|
||||
|
||||
current_ota.set_boot()
|
||||
@@ -1,164 +0,0 @@
|
||||
# esp32_ota module for MicroPython on ESP32
|
||||
# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20
|
||||
|
||||
# Based on OTA class by Thorsten von Eicken (@tve):
|
||||
# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py
|
||||
|
||||
|
||||
import binascii
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
|
||||
import machine
|
||||
from esp32 import Partition
|
||||
from flashbdev import bdev
|
||||
from micropython import const
|
||||
|
||||
OTA_UNSUPPORTED = const(-261)
|
||||
ESP_ERR_OTA_VALIDATE_FAILED = const(-5379)
|
||||
OTA_MIN: int = const(16) # type: ignore
|
||||
OTA_MAX: int = const(32) # type: ignore
|
||||
|
||||
OTA_SIZE = 0x20 # The size of an OTA record in bytes (32 bytes)
|
||||
OTA_BLOCKS = (0, 1) # The offsets of the OTA records in the otadata partition
|
||||
OTA_FMT = b"<L20sLL" # The format for reading/writing binary OTA records
|
||||
OTA_LABEL = b"\xff" * OTA_SIZE # The expected label field in the OTA record
|
||||
OTA_CRC_INIT = 0xFFFFFFFF # The initial value for the CRC32 checksum
|
||||
OTADATA_TYPE = (1, 0) # The type and subtype of the otadata partition
|
||||
|
||||
otastate = {
|
||||
0: "NEW",
|
||||
1: "PENDING",
|
||||
2: "VALID",
|
||||
3: "INVALID",
|
||||
4: "ABORTED",
|
||||
0xFFFFFFFF: "UNDEFINED",
|
||||
}
|
||||
|
||||
otadata_part = p[0] if (p := Partition.find(*OTADATA_TYPE)) else None
|
||||
current_ota = Partition(Partition.RUNNING) # Partition we booted from
|
||||
next_ota = None # Partition for the next OTA update (if device is OTA enabled)
|
||||
try:
|
||||
if otadata_part: # Avoid IDF error messages by checking for otadata partition
|
||||
next_ota = current_ota.get_next_update()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# Return the partition we will boot from on next boot
|
||||
def boot_ota() -> Partition: # Partition we will boot from on next boot
|
||||
if next_ota: # Avoid IDF debug messages by checking for otadata partition
|
||||
try:
|
||||
return Partition(Partition.BOOT)
|
||||
except OSError: # OTA support is not available, return current partition
|
||||
pass
|
||||
return Partition(Partition.RUNNING)
|
||||
|
||||
|
||||
# Return True if the device is configured for OTA updates
|
||||
def ready() -> bool:
|
||||
return next_ota is not None
|
||||
|
||||
|
||||
def partition_table() -> list[tuple[int, int, int, int, str, bool]]:
|
||||
partitions = [p.info() for p in Partition.find(Partition.TYPE_APP)]
|
||||
partitions.extend([p.info() for p in Partition.find(Partition.TYPE_DATA)])
|
||||
partitions.sort(key=lambda i: i[2]) # Sort by address
|
||||
return partitions
|
||||
|
||||
|
||||
def partition_table_print() -> None:
|
||||
ptype = {Partition.TYPE_APP: "app", Partition.TYPE_DATA: "data"}
|
||||
subtype = [
|
||||
{0: "factory"} | {i: f"ota_{i-OTA_MIN}" for i in range(OTA_MIN, OTA_MAX)},
|
||||
{0: "ota", 1: "phy", 2: "nvs", 129: "fat"}, # DATA subtypes
|
||||
]
|
||||
print("Partition table:")
|
||||
print("# Name Type SubType Offset Size (bytes)")
|
||||
for p in partition_table():
|
||||
print(
|
||||
f" {p[4]:10s} {ptype[p[0]]:8s} {subtype[p[0]][p[1]]:8} "
|
||||
+ f"{p[2]:#10x} {p[3]:#10x} {p[3]:10,}"
|
||||
)
|
||||
|
||||
|
||||
# Return a list of OTA partitions sorted by partition subtype number
|
||||
def ota_partitions() -> list[Partition]:
|
||||
partitions: list[Partition] = [
|
||||
p
|
||||
for p in Partition.find(Partition.TYPE_APP)
|
||||
if OTA_MIN <= p.info()[1] < OTA_MAX
|
||||
]
|
||||
# Sort by the OTA partition subtype: ota_0 (16), ota_1 (17), ota_2 (18), ...
|
||||
partitions.sort(key=lambda p: p.info()[1])
|
||||
return partitions
|
||||
|
||||
|
||||
# Print the status of the otadata partition
|
||||
def otadata_check() -> None:
|
||||
if not otadata_part:
|
||||
return
|
||||
valid_seq = 1
|
||||
for i in (0, 1):
|
||||
otadata_part.readblocks(i, (b := bytearray(OTA_SIZE)))
|
||||
seq, _, state_num, crc = struct.unpack(OTA_FMT, b)
|
||||
state = otastate[state_num]
|
||||
is_valid = (
|
||||
state == "VALID"
|
||||
and binascii.crc32(struct.pack(b"<L", seq), OTA_CRC_INIT) == crc
|
||||
)
|
||||
if is_valid and seq > valid_seq:
|
||||
valid_seq = seq
|
||||
print(f"OTA record: state={state}, seq={seq}, crc={crc}, valid={is_valid}")
|
||||
print(
|
||||
f"OTA record is {state}."
|
||||
+ (" Will be updated on next boot." if state == "VALID" else "")
|
||||
)
|
||||
p = ota_partitions()
|
||||
print(f"Next boot is '{p[(valid_seq - 1) % len(p)].info()[4]}'.")
|
||||
|
||||
|
||||
# Print a detailed summary of the OTA status of the device
|
||||
def status() -> None:
|
||||
upyversion, pname = sys.version.split(" ")[2], current_ota.info()[4]
|
||||
print(f"Micropython {upyversion} has booted from partition '{pname}'.")
|
||||
print(f"Will boot from partition '{boot_ota().info()[4]}' on next reboot.")
|
||||
if not ota_partitions():
|
||||
print("There are no OTA partitions available.")
|
||||
elif not next_ota:
|
||||
print("No spare OTA partition is available for update.")
|
||||
else:
|
||||
print(f"The next OTA partition for update is '{next_ota.info()[4]}'.")
|
||||
print(f"The / filesystem is mounted from partition '{bdev.info()[4]}'.")
|
||||
partition_table_print()
|
||||
otadata_check()
|
||||
|
||||
|
||||
# The functions below are used by `ota.rollback` and are here to make
|
||||
# `ota.rollback` as lightweight as possible for the common use case:
|
||||
# calling `ota.rollback.cancel()` on every boot.
|
||||
|
||||
|
||||
# Reboot the device after the provided delay
|
||||
def ota_reboot(delay=10) -> None:
|
||||
for i in range(delay, 0, -1):
|
||||
print(f"\rRebooting in {i:2} seconds (ctrl-C to cancel)", end="")
|
||||
time.sleep(1)
|
||||
print()
|
||||
machine.reset() # Reboot into the new image
|
||||
|
||||
|
||||
# Micropython does not support forcing an OTA rollback so we do it by hand:
|
||||
# - find the previous ota partition, validate the image and set it bootable.
|
||||
# Raises OSError(-5379) if validation of the boot image fails.
|
||||
# Raises OSError(-261) if no OTA partitions are available.
|
||||
def force_rollback(reboot=False) -> None:
|
||||
partitions = ota_partitions()
|
||||
for i, p in enumerate(partitions):
|
||||
if p.info() == current_ota.info(): # Compare by partition offset
|
||||
partitions[i - 1].set_boot() # Set the previous partition to be bootable
|
||||
if reboot:
|
||||
ota_reboot()
|
||||
return
|
||||
raise OSError(OTA_UNSUPPORTED)
|
||||
@@ -1,152 +0,0 @@
|
||||
# esp32_ota module for MicroPython on ESP32
|
||||
# MIT license; Copyright (c) 2023 Glenn Moloney @glenn20
|
||||
|
||||
# Inspired by OTA class by Thorsten von Eicken (@tve):
|
||||
# https://github.com/tve/mqboard/blob/master/mqrepl/mqrepl.py
|
||||
|
||||
import gc
|
||||
import io
|
||||
|
||||
from esp32 import Partition
|
||||
|
||||
from .blockdev_writer import BlockDevWriter
|
||||
from .status import ota_reboot
|
||||
|
||||
|
||||
# Micropython sockets don't have context manager methods. This wrapper provides
|
||||
# those.
|
||||
class SocketWrapper:
|
||||
def __init__(self, f: io.BufferedReader):
|
||||
self.f = f
|
||||
|
||||
def __enter__(self) -> io.BufferedReader:
|
||||
return self.f
|
||||
|
||||
def __exit__(self, e_t, e_v, e_tr):
|
||||
self.f.close()
|
||||
|
||||
|
||||
# Open a file or a URL and return a File-like object for reading
|
||||
def open_url(url_or_filename: str, **kw) -> io.BufferedReader:
|
||||
if url_or_filename.split(":", 1)[0] in ("http", "https"):
|
||||
import requests
|
||||
|
||||
r = requests.get(url_or_filename, **kw)
|
||||
code: int = r.status_code
|
||||
if code != 200:
|
||||
r.close()
|
||||
raise ValueError(f"HTTP Error: {code}")
|
||||
return SocketWrapper(r.raw) # type: ignore
|
||||
else:
|
||||
return open(url_or_filename, "rb")
|
||||
|
||||
|
||||
# OTA manages a MicroPython firmware update over-the-air. It checks that there
|
||||
# are at least two "ota" "app" partitions in the partition table and writes new
|
||||
# firmware into the partition that is not currently running. When the update is
|
||||
# complete, it sets the new partition as the next one to boot. Set reboot=True
|
||||
# to force a reset/restart, or call machine.reset() explicitly. Remember to call
|
||||
# ota.rollback.cancel() after a successful reboot to the new image.
|
||||
class OTA:
|
||||
def __init__(self, verify=True, verbose=True, reboot=False, sha="", length=0):
|
||||
self.reboot = reboot
|
||||
self.verbose = verbose
|
||||
# Get the next free OTA partition
|
||||
# Raise OSError(ENOENT) if no OTA partition available
|
||||
self.part = Partition(Partition.RUNNING).get_next_update()
|
||||
if verbose:
|
||||
name: str = self.part.info()[4]
|
||||
print(f"Writing new micropython image to OTA partition '{name}'...")
|
||||
self.writer = BlockDevWriter(self.part, verify, verbose)
|
||||
if sha or length:
|
||||
self.writer.set_sha_length(sha, length)
|
||||
|
||||
# Append the data to the OTA partition
|
||||
def write(self, data: bytearray | bytes | memoryview) -> int:
|
||||
return self.writer.write(data)
|
||||
|
||||
# Flush any buffered data to the ota partition and set it as the boot
|
||||
# partition. If verify is True, will read back the written firmware data to
|
||||
# check the sha256 of the written data. If reboot is True, will reboot the
|
||||
# device after 10 seconds.
|
||||
def close(self) -> None:
|
||||
if self.writer is None:
|
||||
return
|
||||
self.writer.close()
|
||||
# Set as boot partition for next reboot
|
||||
name: str = self.part.info()[4]
|
||||
print(f"OTA Partition '{name}' updated successfully.")
|
||||
self.part.set_boot() # Raise OSError(-5379) if image on part is not valid
|
||||
bootname = Partition(Partition.BOOT).info()[4]
|
||||
if name != bootname:
|
||||
print(f"Warning: failed to set {name} as the next boot partition.")
|
||||
print(f"Micropython will boot from '{bootname}' partition on next boot.")
|
||||
print("Remember to call ota.rollback.cancel() after successful reboot.")
|
||||
if self.reboot:
|
||||
ota_reboot()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, e_t, e_v, e_tr):
|
||||
if e_t is None: # If exception is thrown, don't flush data or set bootable
|
||||
self.close()
|
||||
|
||||
# Load a firmware file from the provided io stream
|
||||
# - f: an io stream (supporting the f.readinto() method)
|
||||
# - sha: (optional) the sha256sum of the firmware file
|
||||
# - length: (optional) the length (in bytes) of the firmware file
|
||||
def from_stream(self, f: io.BufferedReader, sha: str = "", length: int = 0) -> int:
|
||||
if sha or length:
|
||||
self.writer.set_sha_length(sha, length)
|
||||
gc.collect()
|
||||
return self.writer.write_from_stream(f)
|
||||
|
||||
# Write new firmware to the OTA partition from the given url
|
||||
# - url: a filename or a http[s] url for the micropython.bin firmware.
|
||||
# - sha: the sha256sum of the firmware file
|
||||
# - length: the length (in bytes) of the firmware file
|
||||
def from_firmware_file(self, url: str, sha: str = "", length: int = 0, **kw) -> int:
|
||||
if self.verbose:
|
||||
print(f"Opening firmware file {url}...")
|
||||
with open_url(url, **kw) as f:
|
||||
return self.from_stream(f, sha, length)
|
||||
|
||||
# Load a firmware file, the location of which is read from a json file
|
||||
# containing the url for the firmware file, the sha and length of the file.
|
||||
# - url: the name of a file or url containing the json.
|
||||
# - kw: extra keywords arguments that will be passed to `requests.get()`
|
||||
def from_json(self, url: str, **kw) -> int:
|
||||
if not url.endswith(".json"):
|
||||
raise ValueError("Url does not end with '.json'")
|
||||
if self.verbose:
|
||||
print(f"Opening json file {url}...")
|
||||
with open_url(url, **kw) as f:
|
||||
from json import load
|
||||
|
||||
data: dict = load(f)
|
||||
try:
|
||||
firmware: str = data["firmware"]
|
||||
sha: str = data["sha"]
|
||||
length: int = data["length"]
|
||||
if not any(firmware.startswith(s) for s in ("https:", "http:", "/")):
|
||||
# If firmware filename is relative, append to base of url of json file
|
||||
baseurl, *_ = url.rsplit("/", 1)
|
||||
firmware = f"{baseurl}/{firmware}"
|
||||
return self.from_firmware_file(firmware, sha, length, **kw)
|
||||
except KeyError as err:
|
||||
print('OTA json must include "firmware", "sha" and "length" keys.')
|
||||
raise err
|
||||
|
||||
|
||||
# Convenience functions which use the OTA class to perform OTA updates.
|
||||
def from_file(
|
||||
url: str, sha="", length=0, verify=True, verbose=True, reboot=True, **kw
|
||||
) -> None:
|
||||
with OTA(verify, verbose, reboot) as ota_update:
|
||||
ota_update.from_firmware_file(url, sha, length, **kw)
|
||||
|
||||
|
||||
def from_json(url: str, verify=True, verbose=True, reboot=True, **kw) -> None:
|
||||
with OTA(verify, verbose, reboot) as ota_update:
|
||||
ota_update.from_json(url, **kw)
|
||||
Reference in New Issue
Block a user