You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
@@ -6,7 +6,7 @@
|
||||
"icon_url": "https://apps.micropythonos.com/apps/com.micropythonos.scan_bluetooth/icons/com.micropythonos.scan_bluetooth_0.0.1_64x64.png",
|
||||
"download_url": "https://apps.micropythonos.com/apps/com.micropythonos.scan_bluetooth/mpks/com.micropythonos.scan_bluetooth_0.0.1.mpk",
|
||||
"fullname": "com.micropythonos.scan_bluetooth",
|
||||
"version": "0.0.1",
|
||||
"version": "0.1.0",
|
||||
"category": "development",
|
||||
"activities": [
|
||||
{
|
||||
|
||||
+105
-83
@@ -3,27 +3,32 @@ Initial author: https://github.com/jedie
|
||||
https://docs.micropython.org/en/latest/library/bluetooth.html
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
try:
|
||||
import bluetooth
|
||||
except ImportError: # Linux test runner may not provide bluetooth module
|
||||
bluetooth = None
|
||||
|
||||
import sys
|
||||
|
||||
import lvgl as lv
|
||||
from micropython import const
|
||||
from mpos import Activity
|
||||
from mpos import Activity, TaskManager
|
||||
|
||||
# Scan for 5 seconds,
|
||||
SCAN_DURATION_MS = const(5000) # Duration of each BLE scan in milliseconds
|
||||
# with very low interval/window (to maximize detection rate):
|
||||
INTERVAL_US = const(30000)
|
||||
WINDOW_US = const(30000)
|
||||
|
||||
SCAN_DURATION = const(1000) # Duration of each BLE scan in milliseconds
|
||||
_IRQ_SCAN_RESULT = const(5)
|
||||
|
||||
_IRQ_SCAN_DONE = const(6)
|
||||
|
||||
# BLE Advertising Data Types (Standardized by Bluetooth SIG)
|
||||
_ADV_TYPE_NAME = const(0x09)
|
||||
_ADV_TYPE_SHORT_NAME = const(8)
|
||||
_ADV_TYPE_NAME = const(9)
|
||||
|
||||
|
||||
def decode_field(payload: bytes, adv_type: int) -> list:
|
||||
results = []
|
||||
def decode_name(payload: bytes) -> str | None:
|
||||
i = 0
|
||||
payload_len = len(payload)
|
||||
while i < payload_len:
|
||||
@@ -31,40 +36,12 @@ def decode_field(payload: bytes, adv_type: int) -> list:
|
||||
if length == 0 or i + length >= payload_len:
|
||||
break
|
||||
field_type = payload[i + 1]
|
||||
if field_type == adv_type:
|
||||
results.append(payload[i + 2 : i + length + 1])
|
||||
if field_type in (_ADV_TYPE_SHORT_NAME, _ADV_TYPE_NAME):
|
||||
if new_name := payload[i + 2 : i + length + 1]:
|
||||
return str(new_name, "utf-8")
|
||||
else:
|
||||
print(f"Unsupported: {field_type=} with {length=}")
|
||||
i += length + 1
|
||||
return results
|
||||
|
||||
|
||||
class BluetoothScanner:
|
||||
def __init__(self, device_callback):
|
||||
if bluetooth is None:
|
||||
raise RuntimeError("Bluetooth module not available")
|
||||
self.device_callback = device_callback
|
||||
self.ble = bluetooth.BLE()
|
||||
self.ble.irq(self.ble_irq_handler)
|
||||
|
||||
def __enter__(self):
|
||||
print("Activating BLE")
|
||||
self.ble.active(True)
|
||||
return self
|
||||
|
||||
def ble_irq_handler(self, event: int, data: tuple) -> None:
|
||||
if event == _IRQ_SCAN_RESULT:
|
||||
addr_type, addr, adv_type, rssi, adv_data = data
|
||||
addr = ":".join(f"{b:02x}" for b in addr)
|
||||
names = decode_field(adv_data, _ADV_TYPE_NAME)
|
||||
name = str(names[0], "utf-8") if names else "Unknown"
|
||||
self.device_callback(addr, rssi, name)
|
||||
|
||||
def scan(self, duration_ms: int):
|
||||
print(f"BLE scanning for {duration_ms}ms...")
|
||||
self.ble.gap_scan(duration_ms, 20000, 10000)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
print("Deactivating BLE")
|
||||
self.ble.active(False)
|
||||
|
||||
|
||||
def set_dynamic_column_widths(table, font=None, padding=8):
|
||||
@@ -85,22 +62,31 @@ def set_cell_value(table, *, row: int, values: tuple):
|
||||
|
||||
|
||||
class ScanBluetooth(Activity):
|
||||
refresh_timer = None
|
||||
|
||||
def onCreate(self):
|
||||
screen = lv.obj()
|
||||
screen.set_flex_flow(lv.FLEX_FLOW.COLUMN)
|
||||
screen.set_style_pad_all(0, 0)
|
||||
screen.set_size(lv.pct(100), lv.pct(100))
|
||||
main_content = lv.obj()
|
||||
main_content.set_flex_flow(lv.FLEX_FLOW.COLUMN)
|
||||
main_content.set_style_pad_all(0, 0)
|
||||
main_content.set_size(lv.pct(100), lv.pct(100))
|
||||
|
||||
info_column = lv.obj(main_content)
|
||||
info_column.set_flex_flow(lv.FLEX_FLOW.COLUMN)
|
||||
info_column.set_style_pad_all(1, 1)
|
||||
info_column.set_size(lv.pct(100), lv.SIZE_CONTENT)
|
||||
|
||||
self.info_label = lv.label(info_column)
|
||||
self.info_label.set_style_text_font(lv.font_montserrat_14, 0)
|
||||
|
||||
if bluetooth is None:
|
||||
label = lv.label(screen)
|
||||
label.set_text("Bluetooth not available on this platform")
|
||||
label.center()
|
||||
self.setContentView(screen)
|
||||
self.info("Bluetooth not available on this platform")
|
||||
self.setContentView(main_content)
|
||||
return
|
||||
|
||||
self.table = lv.table(screen)
|
||||
tabel_column = lv.obj(main_content)
|
||||
tabel_column.set_flex_flow(lv.FLEX_FLOW.COLUMN)
|
||||
tabel_column.set_style_pad_all(0, 0)
|
||||
tabel_column.set_size(lv.pct(100), lv.SIZE_CONTENT)
|
||||
|
||||
self.table = lv.table(tabel_column)
|
||||
set_cell_value(
|
||||
self.table,
|
||||
row=0,
|
||||
@@ -108,52 +94,88 @@ class ScanBluetooth(Activity):
|
||||
)
|
||||
set_dynamic_column_widths(self.table)
|
||||
|
||||
self.scan_count = 0
|
||||
self.mac2column = {}
|
||||
self.mac2counts = {}
|
||||
self.mac2name = {}
|
||||
|
||||
self.scanner_cm = BluetoothScanner(device_callback=self.scan_callback)
|
||||
self.scanner = self.scanner_cm.__enter__() # Activate BLE
|
||||
self.ble = bluetooth.BLE()
|
||||
|
||||
self.setContentView(screen)
|
||||
self.setContentView(main_content)
|
||||
|
||||
def scan_callback(self, addr, rssi, name):
|
||||
if not (column_index := self.mac2column.get(addr)):
|
||||
column_index = len(self.mac2column) + 1
|
||||
self.mac2column[addr] = column_index
|
||||
self.mac2counts[addr] = 1
|
||||
else:
|
||||
self.mac2counts[addr] += 1
|
||||
def info(self, text):
|
||||
print(text)
|
||||
self.info_label.set_text(text)
|
||||
|
||||
set_cell_value(
|
||||
self.table,
|
||||
row=column_index,
|
||||
values=(
|
||||
str(column_index),
|
||||
addr,
|
||||
f"{rssi} dBm",
|
||||
str(self.mac2counts[addr]),
|
||||
name,
|
||||
),
|
||||
)
|
||||
async def ble_scan(self):
|
||||
"""Check sensor every second"""
|
||||
while self.scanning:
|
||||
print(f"async scan for {SCAN_DURATION_MS}ms...")
|
||||
self.ble.gap_scan(SCAN_DURATION_MS, INTERVAL_US, WINDOW_US, True)
|
||||
await TaskManager.sleep_ms(SCAN_DURATION_MS + 100)
|
||||
|
||||
def onResume(self, screen):
|
||||
super().onResume(screen)
|
||||
if bluetooth is None:
|
||||
return
|
||||
|
||||
def update(timer):
|
||||
self.scanner.scan(SCAN_DURATION)
|
||||
set_dynamic_column_widths(self.table)
|
||||
time.sleep_ms(SCAN_DURATION + 100) # Wait ?
|
||||
print(f"Scan complete: {len(self.mac2column)} unique devices")
|
||||
self.info("Activating Bluetooth...")
|
||||
self.ble.irq(self.ble_irq_handler)
|
||||
self.ble.active(True)
|
||||
|
||||
self.refresh_timer = lv.timer_create(update, SCAN_DURATION + 1000, None)
|
||||
self.scanning = True
|
||||
TaskManager.create_task(self.ble_scan())
|
||||
|
||||
def onPause(self, screen):
|
||||
super().onPause(screen)
|
||||
if bluetooth is None:
|
||||
return
|
||||
self.scanner.__exit__(None, None, None) # Deactivate BLE
|
||||
if self.refresh_timer:
|
||||
self.refresh_timer.delete()
|
||||
self.refresh_timer = None
|
||||
|
||||
self.scanning = False
|
||||
|
||||
self.info("Stop scanning...")
|
||||
self.ble.gap_scan(None)
|
||||
self.info("Deactivating BLE...")
|
||||
self.ble.active(False)
|
||||
self.info("BLE deactivated")
|
||||
|
||||
def ble_irq_handler(self, event: int, data: tuple) -> None:
|
||||
try:
|
||||
if event == _IRQ_SCAN_RESULT:
|
||||
addr_type, addr, adv_type, rssi, adv_data = data
|
||||
addr = ":".join(f"{b:02x}" for b in addr)
|
||||
print(f"{addr=} {rssi=} {len(adv_data)=}")
|
||||
if name := decode_name(adv_data):
|
||||
self.mac2name[addr] = name
|
||||
else:
|
||||
name = self.mac2name.get(addr, "Unknown")
|
||||
|
||||
if not (column_index := self.mac2column.get(addr)):
|
||||
column_index = len(self.mac2column) + 1
|
||||
self.mac2column[addr] = column_index
|
||||
self.mac2counts[addr] = 1
|
||||
else:
|
||||
self.mac2counts[addr] += 1
|
||||
|
||||
set_cell_value(
|
||||
self.table,
|
||||
row=column_index,
|
||||
values=(
|
||||
str(column_index),
|
||||
addr,
|
||||
f"{rssi} dBm",
|
||||
str(self.mac2counts[addr]),
|
||||
name,
|
||||
),
|
||||
)
|
||||
elif event == _IRQ_SCAN_DONE:
|
||||
set_dynamic_column_widths(self.table)
|
||||
self.scan_count += 1
|
||||
self.info(
|
||||
f"{len(self.mac2column)} unique devices (Scan {self.scan_count})"
|
||||
)
|
||||
else:
|
||||
print(f"Ignored BLE {event=}")
|
||||
except Exception as e:
|
||||
sys.print_exception(e)
|
||||
print(f"Error in BLE IRQ handler {event=}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user