Add README.md and draft_code

This commit is contained in:
Thomas Farstrike
2025-04-30 09:22:06 +02:00
parent e4372a984c
commit cf5bbfa958
11 changed files with 1063 additions and 538 deletions
+54
View File
@@ -0,0 +1,54 @@
import binascii
import os
import sys
try:
input_file = sys.argv[1]
except IndexError:
print('No file or directory given using current working directory')
input_file = os.getcwd()
def run(in_file):
output_file = os.path.splitext(in_file)[0] + '.py'
ext = os.path.splitext(in_file)[1][1:]
with open(in_file, 'rb') as f:
data = f.read()
data = binascii.hexlify(data)
data = [' ' + str(data[i: min(i + 74, len(data))])[1:] for i in range(0, len(data), 74)]
data = '\n'.join(data)
output = f'''\
import binascii
_{ext} = bytearray(binascii.unhexlify(
{data}
))
{ext} = memoryview(_{ext})
'''
with open(output_file, 'w') as f:
f.write(output)
def process_directory(directory):
for root, _, files in os.walk(directory):
for file in files:
file_ext = os.path.splitext(file)[1][1:]
if file_ext not in ('bmp', 'jpg', 'gif', 'png', 'bin', 'MF'):
continue
thisfile = os.path.join(root, file)
print('found file:', thisfile)
run(thisfile)
if os.path.isdir(input_file):
process_directory(input_file)
else:
file_ext = os.path.splitext(input_file)[1][1:]
if file_ext not in ('bmp', 'jpg', 'gif', 'png', 'bin'):
raise RuntimeError('supported image files are bmp, jpg, gif and png')
print('found file:', input_file)
run(input_file)
print('DONE!!')
+2
View File
@@ -0,0 +1,2 @@
print("This script will be included in the build.")
print("You can then run it with: import include_in_build")
+114
View File
@@ -0,0 +1,114 @@
#import mip
#mip.install('github:miguelgrinberg/microdot/src/microdot/microdot.py')
# http://192.168.1.122/upload
# http://192.168.1.122/files//
from microdot import Microdot, Response
import os
# HTML template for the upload form
UPLOAD_FORM = '''
<!DOCTYPE html>
<html>
<head><title>File Manager</title></head>
<body>
<h1>File Manager</h1>
<p><a href="/files">View Files</a></p>
<h2>Upload File</h2>
<form method="POST" action="/upload" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Upload">
</form>
</body>
</html>
'''
app = Microdot()
@app.route('/')
async def index(request):
return '<a href="/files//">Files</a>'
@app.route('/files/<path:path>')
async def list_files(req, path=''):
try:
# Sanitize path to prevent directory traversal
path = path.strip('/')
if '..' in path:
return Response('Invalid path', status_code=400)
# Get directory contents
full_path = '/' + path
files = os.listdir(full_path) if path else os.listdir('/')
html = '<h1>File Manager</h1><ul>'
for f in files:
link_path = f'{path}/{f}' if path else f
html += f'<li><a href="/files/{link_path}">{f}</a> | <a href="/download/{link_path}">Download</a></li>'
html += '</ul>'
return Response(html, headers={'Content-Type': 'text/html'})
except OSError as e:
return Response(f'Error: {e}', status_code=500)
@app.route('/download/<path:path>')
async def download_file(req, path):
try:
full_path = '/' + path.strip('/')
with open(full_path, 'rb') as f:
content = f.read() # Read in chunks for large files
return Response(content, headers={
'Content-Type': 'application/octet-stream',
'Content-Disposition': f'attachment; filename="{path.split("/")[-1]}"'
})
except OSError as e:
return Response(f'Error: {e}', status_code=404)
@app.route('/upload', methods=['GET'])
async def upload_form(req):
return Response(UPLOAD_FORM, headers={'Content-Type': 'text/html'})
@app.route('/upload', methods=['POST'])
async def upload_file(req):
try:
# Check if form data contains a file
if 'file' not in req.form or not req.files['file']['filename']:
return Response('No file selected', status_code=400)
# Get file details
filename = req.files['file']['filename']
file_content = req.files['file']['body']
# Sanitize filename to prevent path traversal
filename = filename.split('/')[-1].split('\\')[-1]
if not filename:
return Response('Invalid filename', status_code=400)
# Save file to filesystem
file_path = f'/{filename}'
with open(file_path, 'wb') as f:
f.write(file_content) # Write in one go for small files
# Free memory
gc.collect()
# Redirect to file listing
return Response(status_code=302, headers={'Location': '/files'})
except OSError as e:
return Response(f'Error saving file: {e}', status_code=500)
except MemoryError:
return Response('File too large for available memory', status_code=507)
def startit():
app.run(port=80)
# http://192.168.1.115:5000
import _thread
_thread.start_new_thread(startit, ())
+142
View File
@@ -0,0 +1,142 @@
import struct
import time
from machine import I2C
# Sensor constants
_QMI8685_PARTID = const(0x05)
_REG_PARTID = const(0x00)
_REG_REVISION = const(0x01)
_REG_CTRL1 = const(0x02) # Serial interface and sensor enable
_REG_CTRL2 = const(0x03) # Accelerometer settings
_REG_CTRL3 = const(0x04) # Gyroscope settings
_REG_CTRL4 = const(0x05) # Magnetomer settings (support not implemented in this driver yet)
_REG_CTRL5 = const(0x06) # Sensor data processing settings
_REG_CTRL6 = const(0x07) # Attitude Engine ODR and Motion on Demand
_REG_CTRL7 = const(0x08) # Enable Sensors and Configure Data Reads
_REG_TEMP = const(0x33) # Temperature sensor.
_REG_AX_L = const(0x35) # Read accelerometer
_REG_AX_H = const(0x36)
_REG_AY_L = const(0x37)
_REG_AY_H = const(0x38)
_REG_AZ_L = const(0x39)
_REG_AZ_H = const(0x3A)
_REG_GX_L = const(0x3B) # read gyro
_REG_GX_H = const(0x3C)
_REG_GY_L = const(0x3D)
_REG_GY_H = const(0x3E)
_REG_GZ_L = const(0x3F)
_REG_GZ_H = const(0x40)
_QMI8658_I2CADDR_DEFAULT = const(0X6B)
_ACCELSCALE_RANGE_2G = const(0b00)
_ACCELSCALE_RANGE_4G = const(0b01)
_ACCELSCALE_RANGE_8G = const(0b10)
_ACCELSCALE_RANGE_16G = const(0b11)
_GYROSCALE_RANGE_16DPS = const(0b000)
_GYROSCALE_RANGE_32DPS = const(0b001)
_GYROSCALE_RANGE_64DPS = const(0b010)
_GYROSCALE_RANGE_128DPS = const(0b011)
_GYROSCALE_RANGE_256DPS = const(0b100)
_GYROSCALE_RANGE_512DPS = const(0b101)
_GYROSCALE_RANGE_1024DPS = const(0b110)
_GYROSCALE_RANGE_2048DPS = const(0b111)
_ODR_8000HZ = const(0b0000)
_ODR_4000HZ = const(0b0001)
_ODR_2000HZ = const(0b0010)
_ODR_1000HZ = const(0b0011)
_ODR_500HZ = const(0b0100)
_ODR_250HZ = const(0b0101)
_ODR_125HZ = const(0b0110)
_ODR_62_5HZ = const(0b0111)
class QMI8658:
global _QMI8658_I2CADDR_DEFAULT
def __init__(self,i2c_bus: I2C,address: int = _QMI8658_I2CADDR_DEFAULT,accel_scale: int = _ACCELSCALE_RANGE_8G,gyro_scale: int = _GYROSCALE_RANGE_256DPS):
self.i2c = i2c_bus
self.address = address
# Verify sensor part ID
if self._read_u8(_REG_PARTID) != _QMI8685_PARTID:
raise AttributeError("Cannot find a QMI8658")
# Setup initial configuration
self._configure_sensor(accel_scale, gyro_scale)
# Configure scales/divisors for the driver
self.acc_scale_divisor = {
_ACCELSCALE_RANGE_2G: 1 << 14,
_ACCELSCALE_RANGE_4G: 1 << 13,
_ACCELSCALE_RANGE_8G: 1 << 12,
_ACCELSCALE_RANGE_16G: 1 << 11,
}[accel_scale]
self.gyro_scale_divisor = {
_GYROSCALE_RANGE_16DPS: 2048,
_GYROSCALE_RANGE_32DPS: 1024,
_GYROSCALE_RANGE_64DPS: 512,
_GYROSCALE_RANGE_128DPS: 256,
_GYROSCALE_RANGE_256DPS: 128,
_GYROSCALE_RANGE_512DPS: 64,
_GYROSCALE_RANGE_1024DPS: 32,
_GYROSCALE_RANGE_2048DPS: 16,
}[gyro_scale]
def _configure_sensor(self, accel_scale: int, gyro_scale: int):
# Initialize accelerometer and gyroscope settings
self._write_u8(_REG_CTRL1, 0x60) # Set SPI auto increment and big endian (Ctrl 1)
self._write_u8(_REG_CTRL2, (accel_scale << 4) | _ODR_1000HZ) # Accel Config
self._write_u8(_REG_CTRL3, (gyro_scale << 4) | _ODR_1000HZ) # Gyro Config
self._write_u8(_REG_CTRL5, 0x01) # Low-pass filter enable
self._write_u8(_REG_CTRL7, 0x03) # Enable accel and gyro
time.sleep_ms(100)
# Helper functions for register operations
def _read_u8(self, reg:int) -> int:
return self.i2c.readfrom_mem(self.address, reg, 1)[0]
def _read_xyz(self, reg:int) -> tuple[int, int, int]:
data = self.i2c.readfrom_mem(self.address, reg, 6)
return struct.unpack('<hhh', data)
def _write_u8(self, reg: int, value: int):
self.i2c.writeto_mem(self.address, reg, bytes([value]))
@property
def temperature(self) -> float:
"""Get the device temperature."""
temp_raw = self._read_u8(_REG_TEMP)
return temp_raw / 256
@property
def acceleration(self) -> tuple[float, float, float]:
"""Get current acceleration reading."""
raw_accel = self._read_xyz(_REG_AX_L)
return tuple(val / self.acc_scale_divisor for val in raw_accel)
@property
def gyro(self) -> tuple[float, float, float]:
"""Get current gyroscope reading."""
raw_gyro = self._read_xyz(_REG_GX_L)
return tuple(val / self.gyro_scale_divisor for val in raw_gyro)
import machine
sensor = QMI8658(I2C(0, sda=machine.Pin(48), scl=machine.Pin(47)))
while True:
print(f"""
QMI8685
{sensor.temperature=}
{sensor.acceleration=}
{sensor.gyro=}
""")
time.sleep(1)
File diff suppressed because it is too large Load Diff
+133
View File
@@ -0,0 +1,133 @@
import lvgl as lv
import uasyncio as asyncio
import utime
import gc
# Create a subwindow for the child script (half the 320x240 display)
screen = lv.screen_active()
subwindow = lv.obj(screen)
subwindow.set_size(160, 240) # Half width, full height
subwindow.align(lv.ALIGN.LEFT_MID, 0, 0) # Left side
subwindow.set_style_bg_color(lv.color_hex(0xDDDDDD), lv.PART.MAIN)
# Create a label for parent updates
parent_label = lv.label(screen)
parent_label.set_text("Parent: 0")
parent_label.set_style_text_font(lv.font_montserrat_12, 0)
parent_label.align(lv.ALIGN.TOP_RIGHT, -10, 10)
# Create a parent button
parent_button = lv.button(screen)
parent_button.set_size(80, 40)
parent_button.align(lv.ALIGN.BOTTOM_RIGHT, -10, -50)
parent_button_label = lv.label(parent_button)
parent_button_label.set_text("Parent Btn")
parent_button_label.set_style_text_font(lv.font_montserrat_12, 0)
# Create a parent slider
parent_slider = lv.slider(screen)
parent_slider.set_size(100, 10)
parent_slider.set_range(0, 100)
parent_slider.align(lv.ALIGN.BOTTOM_RIGHT, -10, -10)
# Parent button callback
def parent_button_cb(e):
print("Parent button clicked")
parent_button.add_event_cb(parent_button_cb, lv.EVENT.CLICKED, None)
# Parent slider callback
def parent_slider_cb(e):
value = parent_slider.get_value()
print("Parent slider value:", value)
parent_slider.add_event_cb(parent_slider_cb, lv.EVENT.VALUE_CHANGED, None)
# Function to execute the child script as a coroutine
async def execute_script(script_source, lvgl_obj):
try:
script_globals = {
'lv': lv,
'subwindow': lvgl_obj,
'asyncio': asyncio,
'utime': utime
}
print("Child script: Compiling")
code = compile(script_source, "<string>", "exec")
exec(code, script_globals)
update_child = script_globals.get('update_child')
if update_child:
print("Child script: Starting update_child")
await update_child()
else:
print("Child script error: No update_child function defined")
except Exception as e:
print("Child script error:", e)
# Child script buffer: updates label, adds button and slider
script_buffer = """
import asyncio
async def update_child():
print("Child coroutine: Creating UI")
# Label
label = lv.label(subwindow)
label.set_text("Child: 0")
label.set_style_text_font(lv.font_montserrat_12, 0)
label.align(lv.ALIGN.TOP_MID, 0, 10)
# Button
button = lv.button(subwindow)
button.set_size(80, 40)
button.align(lv.ALIGN.BOTTOM_MID, 0, -50)
button_label = lv.label(button)
button_label.set_text("Child Btn")
button_label.set_style_text_font(lv.font_montserrat_12, 0)
# Slider
slider = lv.slider(subwindow)
slider.set_size(100, 10)
slider.set_range(0, 100)
slider.align(lv.ALIGN.BOTTOM_MID, 0, -10)
# Button callback
def button_cb(e):
print("Child button clicked")
button.add_event_cb(button_cb, lv.EVENT.CLICKED, None)
# Slider callback
def slider_cb(e):
value = slider.get_value()
print("Child slider value:", value)
slider.add_event_cb(slider_cb, lv.EVENT.VALUE_CHANGED, None)
# Update loop
count = 0
while True:
count += 1
print("Child coroutine: Updating label to", count)
label.set_text(f"Child: {count}")
await asyncio.sleep_ms(2000) # Update every 2s
"""
# Parent coroutine: updates parent label every 1 second
async def update_parent():
count = 0
while True:
count += 1
print("Parent coroutine: Updating label to", count)
parent_label.set_text(f"Parent: {count}")
gc.collect()
print("Parent coroutine: Free memory:", gc.mem_free())
await asyncio.sleep_ms(1000) # Update every 1s
# Main async function to run all tasks
async def main():
print("Main: Starting tasks")
asyncio.create_task(update_parent())
asyncio.create_task(execute_script(script_buffer, subwindow))
while True:
await asyncio.sleep_ms(100)
# Run the event loop
gc.collect()
print("Free memory before loop:", gc.mem_free())
try:
asyncio.run(main())
except Exception as e:
print("Main error:", e)
+77
View File
@@ -0,0 +1,77 @@
import _thread
# Function to execute the child script as a coroutine
def execute_script(script_source, lvgl_obj):
try:
script_globals = {
'lv': lv,
'subwindow': lvgl_obj,
}
print("Child script: Compiling")
code = compile(script_source, "<string>", "exec")
exec(code, script_globals)
app_main = script_globals.get('app_main')
if app_main:
print("Child script: Starting app_main")
app_main()
print("Script finished!")
else:
print("Child script error: No app_main function defined")
except Exception as e:
print("Child script error:", e)
# Child script buffer: updates label, adds button and slider
script_buffer = """
import time
def app_main():
print("Child coroutine: Creating UI")
# Label
label = lv.label(subwindow)
label.set_text("Child: 0")
label.set_style_text_font(lv.font_montserrat_12, 0)
label.align(lv.ALIGN.TOP_MID, 0, 10)
# Button
button = lv.button(subwindow)
button.set_size(80, 40)
button.align(lv.ALIGN.CENTER, 0, 0)
button_label = lv.label(button)
button_label.set_text("Quit")
button_label.set_style_text_font(lv.font_montserrat_12, 0)
# Slider
slider = lv.slider(subwindow)
slider.set_range(0, 100)
slider.align(lv.ALIGN.BOTTOM_MID, 0, -30)
# Quit flag
should_continue = True
# Button callback
def button_cb(e):
nonlocal should_continue
print("Quit button clicked, exiting child")
should_continue = False
button.add_event_cb(button_cb, lv.EVENT.CLICKED, None)
# Slider callback
def slider_cb(e):
value = slider.get_value()
print("Child slider value:", value)
slider.add_event_cb(slider_cb, lv.EVENT.VALUE_CHANGED, None)
# Update loop
count = 0
while should_continue:
count += 1
print("Child coroutine: Updating label to", count)
label.set_text(f"Child: {count}")
time.sleep_ms(1000)
print("Child coroutine: Exiting")
"""
# Start the event loop in a background thread
gc.collect()
print("Free memory before loop:", gc.mem_free())
try:
_thread.stack_size(8192)
_thread.start_new_thread(execute_script, (script_buffer, subwindow))
print("Event loop started in background thread")
except Exception as e:
print("Error starting event loop thread:", e)
+2
View File
@@ -0,0 +1,2 @@
This doesn't work because the build with ffmpeg in it fails,
because it needs that STDIO MEM allocator in lib/lv_conf.h and that has a compilation issue.
+3
View File
@@ -0,0 +1,3 @@
# https://sample-videos.com/
ffmpeg -i SampleVideo_640x360_1mb.mp4 -c:v mjpeg -q:v 7 -vf "fps=15,scale=320:180:flags=lanczos" -c:a pcm_u8 video_320x180.avi