diff --git a/internal_filesystem/lib/mpos/imu/__init__.py b/internal_filesystem/lib/mpos/imu/__init__.py new file mode 100644 index 00000000..49ff04f9 --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/__init__.py @@ -0,0 +1,29 @@ +from mpos.imu.constants import ( + TYPE_ACCELEROMETER, + TYPE_MAGNETIC_FIELD, + TYPE_GYROSCOPE, + TYPE_TEMPERATURE, + TYPE_IMU_TEMPERATURE, + TYPE_SOC_TEMPERATURE, + FACING_EARTH, + FACING_SKY, + GRAVITY, + IMU_CALIBRATION_FILENAME, +) +from mpos.imu.sensor import Sensor +from mpos.imu.manager import ImuManager + +__all__ = [ + "TYPE_ACCELEROMETER", + "TYPE_MAGNETIC_FIELD", + "TYPE_GYROSCOPE", + "TYPE_TEMPERATURE", + "TYPE_IMU_TEMPERATURE", + "TYPE_SOC_TEMPERATURE", + "FACING_EARTH", + "FACING_SKY", + "GRAVITY", + "IMU_CALIBRATION_FILENAME", + "Sensor", + "ImuManager", +] diff --git a/internal_filesystem/lib/mpos/imu/constants.py b/internal_filesystem/lib/mpos/imu/constants.py new file mode 100644 index 00000000..e4e145ea --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/constants.py @@ -0,0 +1,15 @@ +TYPE_ACCELEROMETER = 1 # Units: m/s² (meters per second squared) +TYPE_MAGNETIC_FIELD = 2 # Units: μT (micro teslas) +TYPE_GYROSCOPE = 4 # Units: deg/s (degrees per second) +TYPE_TEMPERATURE = 13 # Units: °C (generic, returns first available - deprecated) +TYPE_IMU_TEMPERATURE = 14 # Units: °C (IMU chip temperature) +TYPE_SOC_TEMPERATURE = 15 # Units: °C (MCU/SoC internal temperature) + +# mounted_position: +FACING_EARTH = 20 # underside of PCB, like fri3d_2024 +FACING_SKY = 21 # top of PCB, like waveshare_esp32_s3_lcd_touch_2 (default) + +# Gravity constant for unit conversions +GRAVITY = 9.80665 # m/s² + +IMU_CALIBRATION_FILENAME = "imu_calibration.json" diff --git a/internal_filesystem/lib/mpos/imu/drivers/base.py b/internal_filesystem/lib/mpos/imu/drivers/base.py new file mode 100644 index 00000000..10d55e92 --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/drivers/base.py @@ -0,0 +1,82 @@ +import time + +from mpos.imu.constants import GRAVITY, FACING_EARTH + + +class IMUDriverBase: + """Base class for IMU drivers with shared calibration logic.""" + + def __init__(self): + self.accel_offset = [0.0, 0.0, 0.0] + self.gyro_offset = [0.0, 0.0, 0.0] + + def read_acceleration(self): + """Returns (x, y, z) in m/s²""" + raise NotImplementedError + + def read_gyroscope(self): + """Returns (x, y, z) in deg/s""" + raise NotImplementedError + + def read_temperature(self): + """Returns temperature in °C""" + raise NotImplementedError + + def _raw_acceleration_mps2(self): + """Returns raw (x, y, z) in m/s² for calibration sampling.""" + raise NotImplementedError + + def _raw_gyroscope_dps(self): + """Returns raw (x, y, z) in deg/s for calibration sampling.""" + raise NotImplementedError + + def calibrate_accelerometer(self, samples): + """Calibrate accel, return (x, y, z) offsets in m/s²""" + sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 + + for _ in range(samples): + ax, ay, az = self._raw_acceleration_mps2() + sum_x += ax + sum_y += ay + sum_z += az + time.sleep_ms(10) + + if FACING_EARTH == FACING_EARTH: + sum_z *= -1 + + self.accel_offset[0] = sum_x / samples + self.accel_offset[1] = sum_y / samples + self.accel_offset[2] = (sum_z / samples) - GRAVITY + + return tuple(self.accel_offset) + + def calibrate_gyroscope(self, samples): + """Calibrate gyro, return (x, y, z) offsets in deg/s""" + sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 + + for _ in range(samples): + gx, gy, gz = self._raw_gyroscope_dps() + sum_x += gx + sum_y += gy + sum_z += gz + time.sleep_ms(10) + + self.gyro_offset[0] = sum_x / samples + self.gyro_offset[1] = sum_y / samples + self.gyro_offset[2] = sum_z / samples + + return tuple(self.gyro_offset) + + def get_calibration(self): + """Return dict with 'accel_offsets' and 'gyro_offsets' keys""" + return { + "accel_offsets": self.accel_offset, + "gyro_offsets": self.gyro_offset, + } + + def set_calibration(self, accel_offsets, gyro_offsets): + """Set calibration offsets from saved values""" + if accel_offsets: + self.accel_offset = list(accel_offsets) + if gyro_offsets: + self.gyro_offset = list(gyro_offsets) diff --git a/internal_filesystem/lib/mpos/imu/drivers/iio.py b/internal_filesystem/lib/mpos/imu/drivers/iio.py new file mode 100644 index 00000000..444b4141 --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/drivers/iio.py @@ -0,0 +1,140 @@ +import os + +from mpos.imu.drivers.base import IMUDriverBase + + +class IIODriver(IMUDriverBase): + """ + Read sensor data via Linux IIO sysfs. + + Typical base path: + /sys/bus/iio/devices/iio:device0 + """ + + accel_path: str + + def __init__(self): + super().__init__() + self.accel_path = self.find_iio_device_with_file("in_accel_x_raw") + print("path:", self.accel_path) + + def _p(self, name: str): + return self.accel_path + "/" + name + + def _exists(self, name): + try: + os.stat(name) + return True + except OSError: + return False + + def _is_dir(self, path): + # MicroPython: stat tuple, mode is [0] + try: + st = os.stat(path) + mode = st[0] + # directory bit (POSIX): 0o040000 + return (mode & 0o170000) == 0o040000 + except OSError: + return False + + def find_iio_device_with_file(self, filename, base_dir="/sys/bus/iio/devices/"): + """ + Returns full path to iio:deviceX that contains given filename, + e.g. "/sys/bus/iio/devices/iio:device0" + + Returns None if not found. + """ + + print("Is dir? ", self._is_dir(base_dir), base_dir) + try: + entries = os.listdir(base_dir) + except OSError: + print("Error listing dir") + return None + + for entry in entries: + print("Entry:", entry) + if not entry.startswith("iio:device"): + continue + + dev_path = base_dir + "/" + entry + if not self._is_dir(dev_path): + continue + + if self._exists(dev_path + "/" + filename): + return dev_path + + return None + + def _read_text(self, name: str) -> str: + print("Read: ", name) + f = open(name, "r") + try: + return f.readline().strip() + finally: + f.close() + + def _read_float(self, name: str) -> float: + return float(self._read_text(name)) + + def _read_int(self, name: str) -> int: + return int(self._read_text(name), 10) + + def _read_raw_scaled(self, raw_name: str, scale_name: str) -> float: + raw = self._read_int(raw_name) + scale = self._read_float(scale_name) + return raw * scale + + def read_temperature(self) -> float: + """ + Tries common IIO patterns: + - in_temp_input (already scaled, usually millidegree C) + - in_temp_raw + in_temp_scale + """ + if not self.accel_path: + return None + + raw_path = self.accel_path + "/" + "in_temp_raw" + scale_path = self.accel_path + "/" + "in_temp_scale" + if not self._exists(raw_path) or not self._exists(scale_path): + return None + return self._read_raw_scaled(raw_path, scale_path) + + def _raw_acceleration_mps2(self): + if not self.accel_path: + return (0.0, 0.0, 0.0) + scale_name = self.accel_path + "/" + "in_accel_scale" + + ax = self._read_raw_scaled(self.accel_path + "/" + "in_accel_x_raw", scale_name) + ay = self._read_raw_scaled(self.accel_path + "/" + "in_accel_y_raw", scale_name) + az = self._read_raw_scaled(self.accel_path + "/" + "in_accel_z_raw", scale_name) + + return (ax, ay, az) + + def _raw_gyroscope_dps(self): + if not self.accel_path: + return (0.0, 0.0, 0.0) + scale_name = self.accel_path + "/" + "in_anglvel_scale" + + gx = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_x_raw", scale_name) + gy = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_y_raw", scale_name) + gz = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_z_raw", scale_name) + + return (gx, gy, gz) + + def read_acceleration(self): + ax, ay, az = self._raw_acceleration_mps2() + return ( + ax - self.accel_offset[0], + ay - self.accel_offset[1], + az - self.accel_offset[2], + ) + + def read_gyroscope(self): + gx, gy, gz = self._raw_gyroscope_dps() + return ( + gx - self.gyro_offset[0], + gy - self.gyro_offset[1], + gz - self.gyro_offset[2], + ) diff --git a/internal_filesystem/lib/mpos/imu/drivers/mpu6886.py b/internal_filesystem/lib/mpos/imu/drivers/mpu6886.py new file mode 100644 index 00000000..9a3ef588 --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/drivers/mpu6886.py @@ -0,0 +1,39 @@ +from mpos.imu.constants import GRAVITY +from mpos.imu.drivers.base import IMUDriverBase + + +class MPU6886Driver(IMUDriverBase): + """Wrapper for MPU6886 IMU (Waveshare board).""" + + def __init__(self, i2c_bus, address): + super().__init__() + from drivers.imu_sensor.mpu6886 import MPU6886 + + self.sensor = MPU6886(i2c_bus, address=address) + + def _raw_acceleration_mps2(self): + ax, ay, az = self.sensor.acceleration + return (ax * GRAVITY, ay * GRAVITY, az * GRAVITY) + + def _raw_gyroscope_dps(self): + gx, gy, gz = self.sensor.gyro + return (gx, gy, gz) + + def read_temperature(self): + return self.sensor.temperature + + def read_acceleration(self): + ax, ay, az = self._raw_acceleration_mps2() + return ( + ax - self.accel_offset[0], + ay - self.accel_offset[1], + az - self.accel_offset[2], + ) + + def read_gyroscope(self): + gx, gy, gz = self._raw_gyroscope_dps() + return ( + gx - self.gyro_offset[0], + gy - self.gyro_offset[1], + gz - self.gyro_offset[2], + ) diff --git a/internal_filesystem/lib/mpos/imu/drivers/qmi8658.py b/internal_filesystem/lib/mpos/imu/drivers/qmi8658.py new file mode 100644 index 00000000..cb061b23 --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/drivers/qmi8658.py @@ -0,0 +1,46 @@ +from mpos.imu.constants import GRAVITY +from mpos.imu.drivers.base import IMUDriverBase + + +class QMI8658Driver(IMUDriverBase): + """Wrapper for QMI8658 IMU (Waveshare board).""" + + def __init__(self, i2c_bus, address): + super().__init__() + from drivers.imu_sensor.qmi8658 import QMI8658 + + _ACCELSCALE_RANGE_8G = 0b10 + _GYROSCALE_RANGE_256DPS = 0b100 + self.sensor = QMI8658( + i2c_bus, + address=address, + accel_scale=_ACCELSCALE_RANGE_8G, + gyro_scale=_GYROSCALE_RANGE_256DPS, + ) + + def _raw_acceleration_mps2(self): + ax, ay, az = self.sensor.acceleration + return (ax * GRAVITY, ay * GRAVITY, az * GRAVITY) + + def _raw_gyroscope_dps(self): + gx, gy, gz = self.sensor.gyro + return (gx, gy, gz) + + def read_acceleration(self): + ax, ay, az = self._raw_acceleration_mps2() + return ( + ax - self.accel_offset[0], + ay - self.accel_offset[1], + az - self.accel_offset[2], + ) + + def read_gyroscope(self): + gx, gy, gz = self._raw_gyroscope_dps() + return ( + gx - self.gyro_offset[0], + gy - self.gyro_offset[1], + gz - self.gyro_offset[2], + ) + + def read_temperature(self): + return self.sensor.temperature diff --git a/internal_filesystem/lib/mpos/imu/drivers/wsen_isds.py b/internal_filesystem/lib/mpos/imu/drivers/wsen_isds.py new file mode 100644 index 00000000..6e8c4262 --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/drivers/wsen_isds.py @@ -0,0 +1,46 @@ +from mpos.imu.constants import GRAVITY +from mpos.imu.drivers.base import IMUDriverBase + + +class WsenISDSDriver(IMUDriverBase): + """Wrapper for WSEN_ISDS IMU (Fri3d badge).""" + + def __init__(self, i2c_bus, address): + super().__init__() + from drivers.imu_sensor.wsen_isds import Wsen_Isds + + self.sensor = Wsen_Isds( + i2c_bus, + address=address, + acc_range="8g", + acc_data_rate="104Hz", + gyro_range="500dps", + gyro_data_rate="104Hz", + ) + + def _raw_acceleration_mps2(self): + ax, ay, az = self.sensor._read_raw_accelerations() + return ((ax / 1000) * GRAVITY, (ay / 1000) * GRAVITY, (az / 1000) * GRAVITY) + + def _raw_gyroscope_dps(self): + gx, gy, gz = self.sensor.read_angular_velocities() + return (gx / 1000.0, gy / 1000.0, gz / 1000.0) + + def read_acceleration(self): + ax, ay, az = self._raw_acceleration_mps2() + return ( + ax - self.accel_offset[0], + ay - self.accel_offset[1], + az - self.accel_offset[2], + ) + + def read_gyroscope(self): + gx, gy, gz = self._raw_gyroscope_dps() + return ( + gx - self.gyro_offset[0], + gy - self.gyro_offset[1], + gz - self.gyro_offset[2], + ) + + def read_temperature(self): + return self.sensor.temperature diff --git a/internal_filesystem/lib/mpos/imu/manager.py b/internal_filesystem/lib/mpos/imu/manager.py new file mode 100644 index 00000000..7944ec9f --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/manager.py @@ -0,0 +1,550 @@ +import time + +from mpos.imu.constants import ( + TYPE_ACCELEROMETER, + TYPE_GYROSCOPE, + TYPE_IMU_TEMPERATURE, + TYPE_SOC_TEMPERATURE, + TYPE_TEMPERATURE, + FACING_EARTH, + FACING_SKY, + GRAVITY, + IMU_CALIBRATION_FILENAME, +) +from mpos.imu.sensor import Sensor +from mpos.imu.drivers.iio import IIODriver +from mpos.imu.drivers.qmi8658 import QMI8658Driver +from mpos.imu.drivers.wsen_isds import WsenISDSDriver +from mpos.imu.drivers.mpu6886 import MPU6886Driver + + +class ImuManager: + """Internal IMU manager (for SensorManager delegation).""" + + def __init__(self): + self._initialized = False + self._imu_driver = None + self._sensor_list = [] + self._i2c_bus = None + self._i2c_address = None + self._mounted_position = FACING_SKY + self._has_mcu_temperature = False + + def init(self, i2c_bus, address=0x6B, mounted_position=FACING_SKY): + self._i2c_bus = i2c_bus + self._i2c_address = address + self._mounted_position = mounted_position + + try: + import esp32 + + _ = esp32.mcu_temperature() + self._has_mcu_temperature = True + self._register_mcu_temperature_sensor() + except: + pass + + self._initialized = True + return True + + def init_iio(self): + self._imu_driver = IIODriver() + self._sensor_list = [ + Sensor( + name="Accelerometer", + sensor_type=TYPE_ACCELEROMETER, + vendor="Linux IIO", + version=1, + max_range="?", + resolution="?", + power_ma=10, + ), + Sensor( + name="Gyroscope", + sensor_type=TYPE_GYROSCOPE, + vendor="Linux IIO", + version=1, + max_range="?", + resolution="?", + power_ma=10, + ), + Sensor( + name="Temperature", + sensor_type=TYPE_IMU_TEMPERATURE, + vendor="Linux IIO", + version=1, + max_range="?", + resolution="?", + power_ma=10, + ), + ] + + self._load_calibration() + + self._initialized = True + return True + + def _ensure_imu_initialized(self): + if not self._initialized or self._imu_driver is not None: + return self._imu_driver is not None + + if self._i2c_bus: + try: + print("Try QMI8658 first (Waveshare board)") + chip_id = self._i2c_bus.readfrom_mem(self._i2c_address, 0x00, 1)[0] + print(f"{chip_id=:#04x}") + if chip_id == 0x05: + self._imu_driver = QMI8658Driver(self._i2c_bus, self._i2c_address) + self._register_qmi8658_sensors() + self._load_calibration() + print("Use QMI8658, ok") + return True + except Exception as exc: + print("No QMI8658:", exc) + + try: + print("Try WSEN_ISDS (fri3d_2024) or LSM6DSO (fri3d_2026)") + chip_id = self._i2c_bus.readfrom_mem(self._i2c_address, 0x0F, 1)[0] + print(f"{chip_id=:#04x}") + if chip_id == 0x6A or chip_id == 0x6C: + self._imu_driver = WsenISDSDriver(self._i2c_bus, self._i2c_address) + self._register_wsen_isds_sensors() + self._load_calibration() + print("Use WSEN_ISDS/LSM6DSO, ok") + return True + except Exception as exc: + print("No WSEN_ISDS or LSM6DSO:", exc) + + try: + print("Try MPU6886 (M5Stack FIRE)") + chip_id = self._i2c_bus.readfrom_mem(self._i2c_address, 0x75, 1)[0] + print(f"{chip_id=:#04x}") + if chip_id == 0x19: + self._imu_driver = MPU6886Driver(self._i2c_bus, self._i2c_address) + self._register_mpu6886_sensors() + self._load_calibration() + print("Use MPU6886, ok") + return True + except Exception as exc: + print("No MPU6886:", exc) + + return False + + def is_available(self): + return self._initialized + + def get_sensor_list(self): + self._ensure_imu_initialized() + return self._sensor_list.copy() if self._sensor_list else [] + + def get_default_sensor(self, sensor_type): + if self._initialized and sensor_type in (TYPE_ACCELEROMETER, TYPE_GYROSCOPE): + self._ensure_imu_initialized() + + for sensor in self._sensor_list: + if sensor.type == sensor_type: + return sensor + return None + + def read_sensor_once(self, sensor): + if sensor.type == TYPE_ACCELEROMETER: + if self._imu_driver: + ax, ay, az = self._imu_driver.read_acceleration() + if self._mounted_position == FACING_EARTH: + az *= -1 + return (ax, ay, az) + elif sensor.type == TYPE_GYROSCOPE: + if self._imu_driver: + return self._imu_driver.read_gyroscope() + elif sensor.type == TYPE_IMU_TEMPERATURE: + if self._imu_driver: + return self._imu_driver.read_temperature() + elif sensor.type == TYPE_SOC_TEMPERATURE: + if self._has_mcu_temperature: + import esp32 + + return esp32.mcu_temperature() + elif sensor.type == TYPE_TEMPERATURE: + if self._imu_driver: + temp = self._imu_driver.read_temperature() + if temp is not None: + return temp + if self._has_mcu_temperature: + import esp32 + + return esp32.mcu_temperature() + return None + + def read_sensor(self, sensor): + if sensor is None: + return None + + if sensor.type in (TYPE_ACCELEROMETER, TYPE_GYROSCOPE): + self._ensure_imu_initialized() + + max_retries = 3 + retry_delay_ms = 20 + + for attempt in range(max_retries): + try: + return self.read_sensor_once(sensor) + except Exception as exc: + import sys + + sys.print_exception(exc) + error_msg = str(exc) + if "data not ready" in error_msg and attempt < max_retries - 1: + time.sleep_ms(retry_delay_ms) + continue + print("Exception reading sensor:", error_msg) + return None + + return None + + def calibrate_sensor(self, sensor, samples=100): + self._ensure_imu_initialized() + if not self.is_available() or sensor is None: + return None + + if sensor.type == TYPE_ACCELEROMETER: + offsets = self._imu_driver.calibrate_accelerometer(samples) + elif sensor.type == TYPE_GYROSCOPE: + offsets = self._imu_driver.calibrate_gyroscope(samples) + else: + return None + + if offsets: + self._save_calibration() + + return offsets + + def check_calibration_quality(self, samples=50): + self._ensure_imu_initialized() + if not self.is_available(): + return None + + try: + accel = self.get_default_sensor(TYPE_ACCELEROMETER) + gyro = self.get_default_sensor(TYPE_GYROSCOPE) + + accel_samples = [[], [], []] + gyro_samples = [[], [], []] + + for _ in range(samples): + if accel: + data = self.read_sensor(accel) + if data: + ax, ay, az = data + accel_samples[0].append(ax) + accel_samples[1].append(ay) + accel_samples[2].append(az) + if gyro: + data = self.read_sensor(gyro) + if data: + gx, gy, gz = data + gyro_samples[0].append(gx) + gyro_samples[1].append(gy) + gyro_samples[2].append(gz) + time.sleep_ms(10) + + accel_stats = [_calc_mean_variance(s) for s in accel_samples] + gyro_stats = [_calc_mean_variance(s) for s in gyro_samples] + + accel_mean = tuple(s[0] for s in accel_stats) + accel_variance = tuple(s[1] for s in accel_stats) + gyro_mean = tuple(s[0] for s in gyro_stats) + gyro_variance = tuple(s[1] for s in gyro_stats) + + issues = [] + scores = [] + + if accel: + accel_max_variance = max(accel_variance) + variance_score = max(0.0, 1.0 - (accel_max_variance / 1.0)) + scores.append(variance_score) + if accel_max_variance > 0.5: + issues.append( + f"High accelerometer variance: {accel_max_variance:.3f} m/s²" + ) + + ax, ay, az = accel_mean + xy_error = (abs(ax) + abs(ay)) / 2.0 + z_error = abs(az - GRAVITY) + expected_score = max(0.0, 1.0 - ((xy_error + z_error) / 5.0)) + scores.append(expected_score) + if xy_error > 1.0: + issues.append( + f"Accel X/Y not near zero: X={ax:.2f}, Y={ay:.2f} m/s²" + ) + if z_error > 1.0: + issues.append(f"Accel Z not near 9.8: Z={az:.2f} m/s²") + + if gyro: + gyro_max_variance = max(gyro_variance) + variance_score = max(0.0, 1.0 - (gyro_max_variance / 10.0)) + scores.append(variance_score) + if gyro_max_variance > 5.0: + issues.append( + f"High gyroscope variance: {gyro_max_variance:.3f} deg/s" + ) + + gx, gy, gz = gyro_mean + error = (abs(gx) + abs(gy) + abs(gz)) / 3.0 + expected_score = max(0.0, 1.0 - (error / 10.0)) + scores.append(expected_score) + if error > 2.0: + issues.append( + f"Gyro not near zero: X={gx:.2f}, Y={gy:.2f}, Z={gz:.2f} deg/s" + ) + + quality_score = sum(scores) / len(scores) if scores else 0.0 + + if quality_score >= 0.8: + quality_rating = "Good" + elif quality_score >= 0.5: + quality_rating = "Fair" + else: + quality_rating = "Poor" + + return { + "accel_mean": accel_mean, + "accel_variance": accel_variance, + "gyro_mean": gyro_mean, + "gyro_variance": gyro_variance, + "quality_score": quality_score, + "quality_rating": quality_rating, + "issues": issues, + } + + except Exception as exc: + print(f"[SensorManager] Error checking calibration quality: {exc}") + return None + + def check_stationarity( + self, + samples=30, + variance_threshold_accel=0.5, + variance_threshold_gyro=5.0, + ): + self._ensure_imu_initialized() + if not self.is_available(): + return None + + try: + accel = self.get_default_sensor(TYPE_ACCELEROMETER) + gyro = self.get_default_sensor(TYPE_GYROSCOPE) + + accel_samples = [[], [], []] + gyro_samples = [[], [], []] + + for _ in range(samples): + if accel: + data = self.read_sensor(accel) + if data: + ax, ay, az = data + accel_samples[0].append(ax) + accel_samples[1].append(ay) + accel_samples[2].append(az) + if gyro: + data = self.read_sensor(gyro) + if data: + gx, gy, gz = data + gyro_samples[0].append(gx) + gyro_samples[1].append(gy) + gyro_samples[2].append(gz) + time.sleep_ms(10) + + accel_var = [_calc_variance(s) for s in accel_samples] + gyro_var = [_calc_variance(s) for s in gyro_samples] + + max_accel_var = max(accel_var) if accel_var else 0.0 + max_gyro_var = max(gyro_var) if gyro_var else 0.0 + + accel_stationary = max_accel_var < variance_threshold_accel + gyro_stationary = max_gyro_var < variance_threshold_gyro + is_stationary = accel_stationary and gyro_stationary + + if is_stationary: + message = "Device is stationary - ready to calibrate" + else: + problems = [] + if not accel_stationary: + problems.append( + f"movement detected (accel variance: {max_accel_var:.3f})" + ) + if not gyro_stationary: + problems.append( + f"rotation detected (gyro variance: {max_gyro_var:.3f})" + ) + message = f"Device NOT stationary: {', '.join(problems)}" + + return { + "is_stationary": is_stationary, + "accel_variance": max_accel_var, + "gyro_variance": max_gyro_var, + "message": message, + } + + except Exception as exc: + print(f"[SensorManager] Error checking stationarity: {exc}") + return None + + def _register_qmi8658_sensors(self): + self._sensor_list = [ + Sensor( + name="QMI8658 Accelerometer", + sensor_type=TYPE_ACCELEROMETER, + vendor="QST Corporation", + version=1, + max_range="±8G (78.4 m/s²)", + resolution="0.0024 m/s²", + power_ma=0.2, + ), + Sensor( + name="QMI8658 Gyroscope", + sensor_type=TYPE_GYROSCOPE, + vendor="QST Corporation", + version=1, + max_range="±256 deg/s", + resolution="0.002 deg/s", + power_ma=0.7, + ), + Sensor( + name="QMI8658 Temperature", + sensor_type=TYPE_IMU_TEMPERATURE, + vendor="QST Corporation", + version=1, + max_range="-40°C to +85°C", + resolution="0.004°C", + power_ma=0, + ), + ] + + def _register_mpu6886_sensors(self): + self._sensor_list = [ + Sensor( + name="MPU6886 Accelerometer", + sensor_type=TYPE_ACCELEROMETER, + vendor="InvenSense", + version=1, + max_range="±16g", + resolution="0.0024 m/s²", + power_ma=0.2, + ), + Sensor( + name="MPU6886 Gyroscope", + sensor_type=TYPE_GYROSCOPE, + vendor="InvenSense", + version=1, + max_range="±256 deg/s", + resolution="0.002 deg/s", + power_ma=0.7, + ), + Sensor( + name="MPU6886 Temperature", + sensor_type=TYPE_IMU_TEMPERATURE, + vendor="InvenSense", + version=1, + max_range="-40°C to +85°C", + resolution="0.05°C", + power_ma=0, + ), + ] + + def _register_wsen_isds_sensors(self): + self._sensor_list = [ + Sensor( + name="WSEN_ISDS Accelerometer", + sensor_type=TYPE_ACCELEROMETER, + vendor="Würth Elektronik", + version=1, + max_range="±8G (78.4 m/s²)", + resolution="0.0024 m/s²", + power_ma=0.2, + ), + Sensor( + name="WSEN_ISDS Gyroscope", + sensor_type=TYPE_GYROSCOPE, + vendor="Würth Elektronik", + version=1, + max_range="±500 deg/s", + resolution="0.0175 deg/s", + power_ma=0.65, + ), + Sensor( + name="WSEN_ISDS Temperature", + sensor_type=TYPE_IMU_TEMPERATURE, + vendor="Würth Elektronik", + version=1, + max_range="-40°C to +85°C", + resolution="0.004°C", + power_ma=0, + ), + ] + + def _register_mcu_temperature_sensor(self): + self._sensor_list.append( + Sensor( + name="ESP32 MCU Temperature", + sensor_type=TYPE_SOC_TEMPERATURE, + vendor="Espressif", + version=1, + max_range="-40°C to +125°C", + resolution="0.5°C", + power_ma=0, + ) + ) + + def _load_calibration(self): + if not self._imu_driver: + return + + try: + from mpos.config import SharedPreferences + + prefs_new = SharedPreferences( + "com.micropythonos.settings", filename=IMU_CALIBRATION_FILENAME + ) + accel_offsets = prefs_new.get_list("accel_offsets") + gyro_offsets = prefs_new.get_list("gyro_offsets") + + if accel_offsets or gyro_offsets: + self._imu_driver.set_calibration(accel_offsets, gyro_offsets) + except: + pass + + def _save_calibration(self): + if not self._imu_driver: + return + + try: + from mpos.config import SharedPreferences + + prefs = SharedPreferences( + "com.micropythonos.settings", filename=IMU_CALIBRATION_FILENAME + ) + editor = prefs.edit() + + cal = self._imu_driver.get_calibration() + editor.put_list("accel_offsets", list(cal["accel_offsets"])) + editor.put_list("gyro_offsets", list(cal["gyro_offsets"])) + editor.commit() + except: + pass + + +def _calc_mean_variance(samples_list): + if not samples_list: + return 0.0, 0.0 + n = len(samples_list) + mean = sum(samples_list) / n + variance = sum((x - mean) ** 2 for x in samples_list) / n + return mean, variance + + +def _calc_variance(samples_list): + if not samples_list: + return 0.0 + n = len(samples_list) + mean = sum(samples_list) / n + return sum((x - mean) ** 2 for x in samples_list) / n diff --git a/internal_filesystem/lib/mpos/imu/sensor.py b/internal_filesystem/lib/mpos/imu/sensor.py new file mode 100644 index 00000000..dc819f7d --- /dev/null +++ b/internal_filesystem/lib/mpos/imu/sensor.py @@ -0,0 +1,25 @@ +class Sensor: + """Sensor metadata (lightweight data class, Android-inspired).""" + + def __init__(self, name, sensor_type, vendor, version, max_range, resolution, power_ma): + """Initialize sensor metadata. + + Args: + name: Human-readable sensor name + sensor_type: Sensor type constant (TYPE_ACCELEROMETER, etc.) + vendor: Sensor vendor/manufacturer + version: Driver version + max_range: Maximum measurement range (with units) + resolution: Measurement resolution (with units) + power_ma: Power consumption in mA (or 0 if unknown) + """ + self.name = name + self.type = sensor_type + self.vendor = vendor + self.version = version + self.max_range = max_range + self.resolution = resolution + self.power = power_ma + + def __repr__(self): + return f"Sensor({self.name}, type={self.type})" diff --git a/internal_filesystem/lib/mpos/sensor_manager.py b/internal_filesystem/lib/mpos/sensor_manager.py index 401198f7..6b96bce7 100644 --- a/internal_filesystem/lib/mpos/sensor_manager.py +++ b/internal_filesystem/lib/mpos/sensor_manager.py @@ -18,58 +18,25 @@ MIT License Copyright (c) 2024 MicroPythonOS contributors """ -import time -import os try: import _thread + _lock = _thread.allocate_lock() except ImportError: _lock = None - -# Sensor type constants (matching Android SensorManager) -TYPE_ACCELEROMETER = 1 # Units: m/s² (meters per second squared) -TYPE_MAGNETIC_FIELD = 2 # Units: μT (micro teslas) -TYPE_GYROSCOPE = 4 # Units: deg/s (degrees per second) -TYPE_TEMPERATURE = 13 # Units: °C (generic, returns first available - deprecated) -TYPE_IMU_TEMPERATURE = 14 # Units: °C (IMU chip temperature) -TYPE_SOC_TEMPERATURE = 15 # Units: °C (MCU/SoC internal temperature) - -# mounted_position: -FACING_EARTH = 20 # underside of PCB, like fri3d_2024 -FACING_SKY = 21 # top of PCB, like waveshare_esp32_s3_lcd_touch_2 (default) - -# Gravity constant for unit conversions -_GRAVITY = 9.80665 # m/s² - -IMU_CALIBRATION_FILENAME = "imu_calibration.json" - - -class Sensor: - """Sensor metadata (lightweight data class, Android-inspired).""" - - def __init__(self, name, sensor_type, vendor, version, max_range, resolution, power_ma): - """Initialize sensor metadata. - - Args: - name: Human-readable sensor name - sensor_type: Sensor type constant (TYPE_ACCELEROMETER, etc.) - vendor: Sensor vendor/manufacturer - version: Driver version - max_range: Maximum measurement range (with units) - resolution: Measurement resolution (with units) - power_ma: Power consumption in mA (or 0 if unknown) - """ - self.name = name - self.type = sensor_type - self.vendor = vendor - self.version = version - self.max_range = max_range - self.resolution = resolution - self.power = power_ma - - def __repr__(self): - return f"Sensor({self.name}, type={self.type})" +from mpos.imu.constants import ( + TYPE_ACCELEROMETER, + TYPE_MAGNETIC_FIELD, + TYPE_GYROSCOPE, + TYPE_TEMPERATURE, + TYPE_IMU_TEMPERATURE, + TYPE_SOC_TEMPERATURE, + FACING_EARTH, + FACING_SKY, +) +from mpos.imu.manager import ImuManager +from mpos.imu.sensor import Sensor class SensorManager: @@ -91,15 +58,10 @@ class SensorManager: """ _instance = None - + # Class-level state variables (for testing and singleton pattern) _initialized = False - _imu_driver = None - _sensor_list = [] - _i2c_bus = None - _i2c_address = None - _mounted_position = FACING_SKY - _has_mcu_temperature = False + _imu_manager = None # Class-level constants TYPE_ACCELEROMETER = TYPE_ACCELEROMETER @@ -134,115 +96,22 @@ class SensorManager: Returns: bool: True if initialized successfully """ - self._i2c_bus = i2c_bus - self._i2c_address = address - self._mounted_position = mounted_position - - # Initialize MCU temperature sensor immediately (fast, no I2C needed) - try: - import esp32 - _ = esp32.mcu_temperature() - self._has_mcu_temperature = True - self._register_mcu_temperature_sensor() - except: - pass - - self._initialized = True - return True + self._ensure_imu_manager() + self._initialized = self._imu_manager.init( + i2c_bus, + address=address, + mounted_position=mounted_position, + ) + return self._initialized def init_iio(self): - self._imu_driver = _IIODriver() - self._sensor_list = [ - Sensor( - name="Accelerometer", - sensor_type=TYPE_ACCELEROMETER, - vendor="Linux IIO", - version=1, - max_range="?", - resolution="?", - power_ma=10 - ), - Sensor( - name="Gyroscope", - sensor_type=TYPE_GYROSCOPE, - vendor="Linux IIO", - version=1, - max_range="?", - resolution="?", - power_ma=10 - ), - Sensor( - name="Temperature", - sensor_type=TYPE_IMU_TEMPERATURE, - vendor="Linux IIO", - version=1, - max_range="?", - resolution="?", - power_ma=10 - ) - ] + self._ensure_imu_manager() + self._initialized = self._imu_manager.init_iio() + return self._initialized - self._load_calibration() - - self._initialized = True - return True - - def _ensure_imu_initialized(self): - """Perform IMU initialization on first use (lazy initialization). - - Tries to detect QMI8658 (chip ID 0x05) or WSEN_ISDS (WHO_AM_I 0x6A). - Loads calibration from SharedPreferences if available. - - Returns: - bool: True if IMU detected and initialized successfully - """ - if not self._initialized or self._imu_driver is not None: - return self._imu_driver is not None - - if self._i2c_bus: - try: - print("Try QMI8658 first (Waveshare board)") - # PARTID register: - chip_id = self._i2c_bus.readfrom_mem(self._i2c_address, 0x00, 1)[0] - print(f"{chip_id=:#04x}") - if chip_id == 0x05: # QMI8685_PARTID - self._imu_driver = _QMI8658Driver(self._i2c_bus, self._i2c_address) - self._register_qmi8658_sensors() - self._load_calibration() - print("Use QMI8658, ok") - return True - except Exception as e: - print("No QMI8658:", e) - - try: - print("Try WSEN_ISDS (fri3d_2024) or LSM6DSO (fri3d_2026)") - # WHO_AM_I register - could also use Wsen_Isds.get_chip_id(): - chip_id = self._i2c_bus.readfrom_mem(self._i2c_address, 0x0F, 1)[0] - print(f"{chip_id=:#04x}") - # WSEN_ISDS WHO_AM_I 0x6A (Fri3d 2024) or 0x6C (Fri3d 2026): - if chip_id == 0x6A or chip_id == 0x6C: - self._imu_driver = _WsenISDSDriver(self._i2c_bus, self._i2c_address) - self._register_wsen_isds_sensors() - self._load_calibration() - print("Use WSEN_ISDS/LSM6DSO, ok") - return True - except Exception as e: - print("No WSEN_ISDS or LSM6DSO:", e) - - try: - print("Try MPU6886 (M5Stack FIRE)") - chip_id = self._i2c_bus.readfrom_mem(self._i2c_address, 0x75, 1)[0] - print(f"{chip_id=:#04x}") - if chip_id == 0x19: - self._imu_driver = _MPU6886Driver(self._i2c_bus, self._i2c_address) - self._register_mpu6886_sensors() - self._load_calibration() - print("Use MPU6886, ok") - return True - except Exception as e: - print("No MPU6886:", e) - - return False + def _ensure_imu_manager(self): + if self._imu_manager is None: + self._imu_manager = ImuManager() def is_available(self): """Check if sensors are available. @@ -254,7 +123,7 @@ class SensorManager: bool: True if SensorManager is initialized (may only have MCU temp, not IMU) """ return self._initialized - + def get_sensor_list(self): """Get list of all available sensors. @@ -263,9 +132,10 @@ class SensorManager: Returns: list: List of Sensor objects """ - self._ensure_imu_initialized() - return self._sensor_list.copy() if self._sensor_list else [] - + if not self._imu_manager: + return [] + return self._imu_manager.get_sensor_list() + def get_default_sensor(self, sensor_type): """Get default sensor of given type. @@ -277,42 +147,14 @@ class SensorManager: Returns: Sensor object or None if not available """ - # Only initialize IMU if SensorManager has been initialized and requesting IMU sensor types - if self._initialized and sensor_type in (TYPE_ACCELEROMETER, TYPE_GYROSCOPE): - self._ensure_imu_initialized() - - for sensor in self._sensor_list: - if sensor.type == sensor_type: - return sensor - return None + if not self._imu_manager: + return None + return self._imu_manager.get_default_sensor(sensor_type) def read_sensor_once(self, sensor): - if sensor.type == TYPE_ACCELEROMETER: - if self._imu_driver: - ax, ay, az = self._imu_driver.read_acceleration() - if self._mounted_position == FACING_EARTH: - az *= -1 - return (ax, ay, az) - elif sensor.type == TYPE_GYROSCOPE: - if self._imu_driver: - return self._imu_driver.read_gyroscope() - elif sensor.type == TYPE_IMU_TEMPERATURE: - if self._imu_driver: - return self._imu_driver.read_temperature() - elif sensor.type == TYPE_SOC_TEMPERATURE: - if self._has_mcu_temperature: - import esp32 - return esp32.mcu_temperature() - elif sensor.type == TYPE_TEMPERATURE: - # Generic temperature - return first available (backward compatibility) - if self._imu_driver: - temp = self._imu_driver.read_temperature() - if temp is not None: - return temp - if self._has_mcu_temperature: - import esp32 - return esp32.mcu_temperature() - return None + if not self._imu_manager: + return None + return self._imu_manager.read_sensor_once(sensor) def read_sensor(self, sensor): """Read sensor data synchronously. @@ -330,35 +172,11 @@ class SensorManager: if sensor is None: return None - # Only initialize IMU if reading IMU sensor - if sensor.type in (TYPE_ACCELEROMETER, TYPE_GYROSCOPE): - self._ensure_imu_initialized() - if _lock: _lock.acquire() try: - # Retry logic for "sensor data not ready" (WSEN_ISDS needs time after init) - max_retries = 3 - retry_delay_ms = 20 # Wait 20ms between retries - - for attempt in range(max_retries): - try: - return self.read_sensor_once(sensor) - except Exception as e: - import sys - sys.print_exception(e) - error_msg = str(e) - # Retry if sensor data not ready, otherwise fail immediately - if "data not ready" in error_msg and attempt < max_retries - 1: - import time - time.sleep_ms(retry_delay_ms) - continue - else: - print("Exception reading sensor:", error_msg) - return None - - return None + return self._imu_manager.read_sensor(sensor) if self._imu_manager else None finally: if _lock: _lock.release() @@ -376,34 +194,24 @@ class SensorManager: Returns: tuple: Calibration offsets (x, y, z) or None if failed """ - self._ensure_imu_initialized() - if not self.is_available() or sensor is None: + if not self._imu_manager: return None if _lock: _lock.acquire() try: - if sensor.type == TYPE_ACCELEROMETER: - offsets = self._imu_driver.calibrate_accelerometer(samples) - elif sensor.type == TYPE_GYROSCOPE: - offsets = self._imu_driver.calibrate_gyroscope(samples) - else: - return None - - if offsets: - self._save_calibration() - - return offsets + return self._imu_manager.calibrate_sensor(sensor, samples=samples) except Exception as e: import sys + sys.print_exception(e) print(f"[SensorManager] Calibration error: {e}") return None finally: if _lock: _lock.release() - + def check_calibration_quality(self, samples=50): """Check quality of current calibration. @@ -423,113 +231,13 @@ class SensorManager: - issues: list of strings describing problems None if IMU not available """ - self._ensure_imu_initialized() - if not self.is_available(): + if not self._imu_manager: return None + return self._imu_manager.check_calibration_quality(samples=samples) - # Don't acquire lock here - let read_sensor() handle it per-read - # (avoids deadlock since read_sensor also acquires the lock) - try: - accel = self.get_default_sensor(TYPE_ACCELEROMETER) - gyro = self.get_default_sensor(TYPE_GYROSCOPE) - - # Collect samples - accel_samples = [[], [], []] # x, y, z lists - gyro_samples = [[], [], []] - - for _ in range(samples): - if accel: - data = self.read_sensor(accel) - if data: - ax, ay, az = data - accel_samples[0].append(ax) - accel_samples[1].append(ay) - accel_samples[2].append(az) - if gyro: - data = self.read_sensor(gyro) - if data: - gx, gy, gz = data - gyro_samples[0].append(gx) - gyro_samples[1].append(gy) - gyro_samples[2].append(gz) - time.sleep_ms(10) - - # Calculate statistics using helper - accel_stats = [_calc_mean_variance(s) for s in accel_samples] - gyro_stats = [_calc_mean_variance(s) for s in gyro_samples] - - accel_mean = tuple(s[0] for s in accel_stats) - accel_variance = tuple(s[1] for s in accel_stats) - gyro_mean = tuple(s[0] for s in gyro_stats) - gyro_variance = tuple(s[1] for s in gyro_stats) - - # Calculate quality score (0.0 - 1.0) - issues = [] - scores = [] - - # Check accelerometer - if accel: - # Variance check (lower is better) - accel_max_variance = max(accel_variance) - variance_score = max(0.0, 1.0 - (accel_max_variance / 1.0)) # 1.0 m/s² variance threshold - scores.append(variance_score) - if accel_max_variance > 0.5: - issues.append(f"High accelerometer variance: {accel_max_variance:.3f} m/s²") - - # Expected values check (X≈0, Y≈0, Z≈9.8) - ax, ay, az = accel_mean - xy_error = (abs(ax) + abs(ay)) / 2.0 - z_error = abs(az - _GRAVITY) - expected_score = max(0.0, 1.0 - ((xy_error + z_error) / 5.0)) # 5.0 m/s² error threshold - scores.append(expected_score) - if xy_error > 1.0: - issues.append(f"Accel X/Y not near zero: X={ax:.2f}, Y={ay:.2f} m/s²") - if z_error > 1.0: - issues.append(f"Accel Z not near 9.8: Z={az:.2f} m/s²") - - # Check gyroscope - if gyro: - # Variance check - gyro_max_variance = max(gyro_variance) - variance_score = max(0.0, 1.0 - (gyro_max_variance / 10.0)) # 10 deg/s variance threshold - scores.append(variance_score) - if gyro_max_variance > 5.0: - issues.append(f"High gyroscope variance: {gyro_max_variance:.3f} deg/s") - - # Expected values check (all ≈0) - gx, gy, gz = gyro_mean - error = (abs(gx) + abs(gy) + abs(gz)) / 3.0 - expected_score = max(0.0, 1.0 - (error / 10.0)) # 10 deg/s error threshold - scores.append(expected_score) - if error > 2.0: - issues.append(f"Gyro not near zero: X={gx:.2f}, Y={gy:.2f}, Z={gz:.2f} deg/s") - - # Overall quality score - quality_score = sum(scores) / len(scores) if scores else 0.0 - - # Rating - if quality_score >= 0.8: - quality_rating = "Good" - elif quality_score >= 0.5: - quality_rating = "Fair" - else: - quality_rating = "Poor" - - return { - 'accel_mean': accel_mean, - 'accel_variance': accel_variance, - 'gyro_mean': gyro_mean, - 'gyro_variance': gyro_variance, - 'quality_score': quality_score, - 'quality_rating': quality_rating, - 'issues': issues - } - - except Exception as e: - print(f"[SensorManager] Error checking calibration quality: {e}") - return None - - def check_stationarity(self, samples=30, variance_threshold_accel=0.5, variance_threshold_gyro=5.0): + def check_stationarity( + self, samples=30, variance_threshold_accel=0.5, variance_threshold_gyro=5.0 + ): """Check if device is stationary (required for calibration). Args: @@ -545,766 +253,13 @@ class SensorManager: - message: string describing result None if IMU not available """ - self._ensure_imu_initialized() - if not self.is_available(): + if not self._imu_manager: return None - - # Don't acquire lock here - let read_sensor() handle it per-read - # (avoids deadlock since read_sensor also acquires the lock) - try: - accel = self.get_default_sensor(TYPE_ACCELEROMETER) - gyro = self.get_default_sensor(TYPE_GYROSCOPE) - - # Collect samples - accel_samples = [[], [], []] - gyro_samples = [[], [], []] - - for _ in range(samples): - if accel: - data = self.read_sensor(accel) - if data: - ax, ay, az = data - accel_samples[0].append(ax) - accel_samples[1].append(ay) - accel_samples[2].append(az) - if gyro: - data = self.read_sensor(gyro) - if data: - gx, gy, gz = data - gyro_samples[0].append(gx) - gyro_samples[1].append(gy) - gyro_samples[2].append(gz) - time.sleep_ms(10) - - # Calculate variance using helper - accel_var = [_calc_variance(s) for s in accel_samples] - gyro_var = [_calc_variance(s) for s in gyro_samples] - - max_accel_var = max(accel_var) if accel_var else 0.0 - max_gyro_var = max(gyro_var) if gyro_var else 0.0 - - # Check thresholds - accel_stationary = max_accel_var < variance_threshold_accel - gyro_stationary = max_gyro_var < variance_threshold_gyro - is_stationary = accel_stationary and gyro_stationary - - # Generate message - if is_stationary: - message = "Device is stationary - ready to calibrate" - else: - problems = [] - if not accel_stationary: - problems.append(f"movement detected (accel variance: {max_accel_var:.3f})") - if not gyro_stationary: - problems.append(f"rotation detected (gyro variance: {max_gyro_var:.3f})") - message = f"Device NOT stationary: {', '.join(problems)}" - - return { - 'is_stationary': is_stationary, - 'accel_variance': max_accel_var, - 'gyro_variance': max_gyro_var, - 'message': message - } - - except Exception as e: - print(f"[SensorManager] Error checking stationarity: {e}") - return None - - def _register_qmi8658_sensors(self): - """Register QMI8658 sensors in sensor list.""" - self._sensor_list = [ - Sensor( - name="QMI8658 Accelerometer", - sensor_type=TYPE_ACCELEROMETER, - vendor="QST Corporation", - version=1, - max_range="±8G (78.4 m/s²)", - resolution="0.0024 m/s²", - power_ma=0.2 - ), - Sensor( - name="QMI8658 Gyroscope", - sensor_type=TYPE_GYROSCOPE, - vendor="QST Corporation", - version=1, - max_range="±256 deg/s", - resolution="0.002 deg/s", - power_ma=0.7 - ), - Sensor( - name="QMI8658 Temperature", - sensor_type=TYPE_IMU_TEMPERATURE, - vendor="QST Corporation", - version=1, - max_range="-40°C to +85°C", - resolution="0.004°C", - power_ma=0 - ) - ] - - def _register_mpu6886_sensors(self): - """Register MPU6886 sensors in sensor list.""" - self._sensor_list = [ - Sensor( - name="MPU6886 Accelerometer", - sensor_type=TYPE_ACCELEROMETER, - vendor="InvenSense", - version=1, - max_range="±16g", - resolution="0.0024 m/s²", - power_ma=0.2, - ), - Sensor( - name="MPU6886 Gyroscope", - sensor_type=TYPE_GYROSCOPE, - vendor="InvenSense", - version=1, - max_range="±256 deg/s", - resolution="0.002 deg/s", - power_ma=0.7, - ), - Sensor( - name="MPU6886 Temperature", - sensor_type=TYPE_IMU_TEMPERATURE, - vendor="InvenSense", - version=1, - max_range="-40°C to +85°C", - resolution="0.05°C", - power_ma=0, - ), - ] - - def _register_wsen_isds_sensors(self): - """Register WSEN_ISDS sensors in sensor list.""" - self._sensor_list = [ - Sensor( - name="WSEN_ISDS Accelerometer", - sensor_type=TYPE_ACCELEROMETER, - vendor="Würth Elektronik", - version=1, - max_range="±8G (78.4 m/s²)", - resolution="0.0024 m/s²", - power_ma=0.2 - ), - Sensor( - name="WSEN_ISDS Gyroscope", - sensor_type=TYPE_GYROSCOPE, - vendor="Würth Elektronik", - version=1, - max_range="±500 deg/s", - resolution="0.0175 deg/s", - power_ma=0.65 - ), - Sensor( - name="WSEN_ISDS Temperature", - sensor_type=TYPE_IMU_TEMPERATURE, - vendor="Würth Elektronik", - version=1, - max_range="-40°C to +85°C", - resolution="0.004°C", - power_ma=0 - ) - ] - - def _register_mcu_temperature_sensor(self): - """Register MCU internal temperature sensor in sensor list.""" - self._sensor_list.append( - Sensor( - name="ESP32 MCU Temperature", - sensor_type=TYPE_SOC_TEMPERATURE, - vendor="Espressif", - version=1, - max_range="-40°C to +125°C", - resolution="0.5°C", - power_ma=0 - ) + return self._imu_manager.check_stationarity( + samples=samples, + variance_threshold_accel=variance_threshold_accel, + variance_threshold_gyro=variance_threshold_gyro, ) - - def _load_calibration(self): - """Load calibration from SharedPreferences (with migration support).""" - if not self._imu_driver: - return - - try: - from mpos.config import SharedPreferences - - # Try NEW location first - prefs_new = SharedPreferences("com.micropythonos.settings", filename=IMU_CALIBRATION_FILENAME) - accel_offsets = prefs_new.get_list("accel_offsets") - gyro_offsets = prefs_new.get_list("gyro_offsets") - - if accel_offsets or gyro_offsets: - self._imu_driver.set_calibration(accel_offsets, gyro_offsets) - except: - pass - - def _save_calibration(self): - """Save calibration to SharedPreferences.""" - if not self._imu_driver: - return - - try: - from mpos.config import SharedPreferences - prefs = SharedPreferences("com.micropythonos.settings", filename=IMU_CALIBRATION_FILENAME) - editor = prefs.edit() - - cal = self._imu_driver.get_calibration() - editor.put_list("accel_offsets", list(cal['accel_offsets'])) - editor.put_list("gyro_offsets", list(cal['gyro_offsets'])) - editor.commit() - except: - pass - - -# ============================================================================ -# Helper functions for calibration quality checking -# ============================================================================ - -def _calc_mean_variance(samples_list): - """Calculate mean and variance for a list of samples.""" - if not samples_list: - return 0.0, 0.0 - n = len(samples_list) - mean = sum(samples_list) / n - variance = sum((x - mean) ** 2 for x in samples_list) / n - return mean, variance - - -def _calc_variance(samples_list): - """Calculate variance for a list of samples.""" - if not samples_list: - return 0.0 - n = len(samples_list) - mean = sum(samples_list) / n - return sum((x - mean) ** 2 for x in samples_list) / n - - -# ============================================================================ -# Internal driver abstraction layer -# ============================================================================ - -class _IMUDriver: - """Base class for IMU drivers (internal use only).""" - - def read_acceleration(self): - """Returns (x, y, z) in m/s²""" - raise NotImplementedError - - def read_gyroscope(self): - """Returns (x, y, z) in deg/s""" - raise NotImplementedError - - def read_temperature(self): - """Returns temperature in °C""" - raise NotImplementedError - - def calibrate_accelerometer(self, samples): - """Calibrate accel, return (x, y, z) offsets in m/s²""" - raise NotImplementedError - - def calibrate_gyroscope(self, samples): - """Calibrate gyro, return (x, y, z) offsets in deg/s""" - raise NotImplementedError - - def get_calibration(self): - """Return dict with 'accel_offsets' and 'gyro_offsets' keys""" - raise NotImplementedError - - def set_calibration(self, accel_offsets, gyro_offsets): - """Set calibration offsets from saved values""" - raise NotImplementedError - - -class _IIODriver(_IMUDriver): - def __init__(self): - self.accel_path = self.find_iio_device_with_file("in_accel_x_raw") - print("path:", self.accel_path) - self.accel_offset = [0.0, 0.0, 0.0] - self.gyro_offset = [0.0, 0.0, 0.0] - - def calibrate_accelerometer(self, samples): - """Calibrate accelerometer (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - ax, ay, az = self.read_acceleration() - sum_x += ax - sum_y += ay - sum_z += az - time.sleep_ms(10) - - if FACING_EARTH == FACING_EARTH: - sum_z *= -1 - - # Average offsets (assuming Z-axis should read +9.8 m/s²) - self.accel_offset[0] = sum_x / samples - self.accel_offset[1] = sum_y / samples - self.accel_offset[2] = (sum_z / samples) - _GRAVITY - - return tuple(self.accel_offset) - - def calibrate_gyroscope(self, samples): - """Calibrate gyroscope (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - gx, gy, gz = self.read_gyroscope() - sum_x += gx - sum_y += gy - sum_z += gz - time.sleep_ms(10) - - # Average offsets (should be 0 when stationary) - self.gyro_offset[0] = sum_x / samples - self.gyro_offset[1] = sum_y / samples - self.gyro_offset[2] = sum_z / samples - - return tuple(self.gyro_offset) - - def get_calibration(self): - """Get current calibration.""" - return { - 'accel_offsets': self.accel_offset, - 'gyro_offsets': self.gyro_offset - } - - def set_calibration(self, accel_offsets, gyro_offsets): - """Set calibration from saved values.""" - if accel_offsets: - self.accel_offset = list(accel_offsets) - if gyro_offsets: - self.gyro_offset = list(gyro_offsets) - """ - Read sensor data via Linux IIO sysfs. - - Typical base path: - /sys/bus/iio/devices/iio:device0 - """ - accel_path: str - - def __init__(self): - self.accel_path = self.find_iio_device_with_file("in_accel_x_raw") - print("path:", self.accel_path) - self.accel_offset = [0.0, 0.0, 0.0] - self.gyro_offset = [0.0, 0.0, 0.0] - - def _p(self, name: str): - return self.accel_path + "/" + name - - def _exists(self, name): - try: - os.stat(name) - return True - except OSError: - return False - - def _is_dir(self, path): - # MicroPython: stat tuple, mode is [0] - try: - st = os.stat(path) - mode = st[0] - # directory bit (POSIX): 0o040000 - return (mode & 0o170000) == 0o040000 - except OSError: - return False - - def find_iio_device_with_file(self, filename, base_dir="/sys/bus/iio/devices/"): - """ - Returns full path to iio:deviceX that contains given filename, - e.g. "/sys/bus/iio/devices/iio:device0" - - Returns None if not found. - """ - - print("Is dir? ", self._is_dir(base_dir), base_dir) - try: - entries = os.listdir(base_dir) - except OSError: - print("Error listing dir") - return None - - for e in entries: - print("Entry:", e) - if not e.startswith("iio:device"): - continue - - print("Entry:", e) - - dev_path = base_dir + "/" + e - if not self._is_dir(dev_path): - continue - - if self._exists(dev_path + "/" + filename): - return dev_path - - return None - - def _read_text(self, name: str) -> str: - p = name - print("Read: ", p) - f = open(p, "r") - try: - return f.readline().strip() - finally: - f.close() - - def _read_float(self, name: str) -> float: - return float(self._read_text(name)) - - def _read_int(self, name: str) -> int: - return int(self._read_text(name), 10) - - def _read_raw_scaled(self, raw_name: str, scale_name: str) -> float: - raw = self._read_int(raw_name) - scale = self._read_float(scale_name) - return raw * scale - - # ---------------------------- - # Public API (replacing I2C) - # ---------------------------- - - def read_temperature(self) -> float: - """ - Tries common IIO patterns: - - in_temp_input (already scaled, usually millidegree C) - - in_temp_raw + in_temp_scale - """ - if not self.accel_path: - return None - - if False: # os.path.exists(self._p("in_temp_input")): - v = self._read_float(self.accel_path + "/" + "in_temp_input") - # Many drivers expose millidegree Celsius here. - if abs(v) > 200: # heuristic: 25000 means 25°C - return v / 1000.0 - return v - - # Fallback: raw + scale - raw_path = self.accel_path + "/" + "in_temp_raw" - scale_path = self.accel_path + "/" + "in_temp_scale" - if not self._exists(raw_path) or not self._exists(scale_path): - return None - return self._read_raw_scaled(raw_path, scale_path) - - def read_acceleration(self) -> tuple[float, float, float]: - """ - Returns acceleration in m/s^2 if the kernel driver uses standard IIO scale. - Common names: - in_accel_{x,y,z}_raw + in_accel_scale - """ - if not self.accel_path: - return (0.0, 0.0, 0.0) - scale_name = self.accel_path + "/" + "in_accel_scale" - - ax = self._read_raw_scaled(self.accel_path + "/" + "in_accel_x_raw", scale_name) - ay = self._read_raw_scaled(self.accel_path + "/" + "in_accel_y_raw", scale_name) - az = self._read_raw_scaled(self.accel_path + "/" + "in_accel_z_raw", scale_name) - - return ( - ax - self.accel_offset[0], - ay - self.accel_offset[1], - az - self.accel_offset[2] - ) - - def read_gyroscope(self) -> tuple[float, float, float]: - """ - Returns angular velocity in rad/s if the kernel driver uses standard IIO scale. - Common names: - in_anglvel_{x,y,z}_raw + in_anglvel_scale - """ - if not self.accel_path: - return (0.0, 0.0, 0.0) - scale_name = self.accel_path + "/" + "in_anglvel_scale" - - gx = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_x_raw", scale_name) - gy = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_y_raw", scale_name) - gz = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_z_raw", scale_name) - - return ( - gx - self.gyro_offset[0], - gy - self.gyro_offset[1], - gz - self.gyro_offset[2] - ) - -class _QMI8658Driver(_IMUDriver): - """Wrapper for QMI8658 IMU (Waveshare board).""" - - def __init__(self, i2c_bus, address): - from drivers.imu_sensor.qmi8658 import QMI8658 - # QMI8658 scale constants (can't import const() values) - _ACCELSCALE_RANGE_8G = 0b10 - _GYROSCALE_RANGE_256DPS = 0b100 - self.sensor = QMI8658( - i2c_bus, - address=address, - accel_scale=_ACCELSCALE_RANGE_8G, - gyro_scale=_GYROSCALE_RANGE_256DPS - ) - # Software calibration offsets (QMI8658 has no built-in calibration) - self.accel_offset = [0.0, 0.0, 0.0] - self.gyro_offset = [0.0, 0.0, 0.0] - - def read_acceleration(self): - """Read acceleration in m/s² (converts from G).""" - ax, ay, az = self.sensor.acceleration - # Convert G to m/s² and apply calibration - return ( - (ax * _GRAVITY) - self.accel_offset[0], - (ay * _GRAVITY) - self.accel_offset[1], - (az * _GRAVITY) - self.accel_offset[2] - ) - - def read_gyroscope(self): - """Read gyroscope in deg/s (already in correct units).""" - gx, gy, gz = self.sensor.gyro - # Apply calibration - return ( - gx - self.gyro_offset[0], - gy - self.gyro_offset[1], - gz - self.gyro_offset[2] - ) - - def read_temperature(self): - """Read temperature in °C.""" - return self.sensor.temperature - - def calibrate_accelerometer(self, samples): - """Calibrate accelerometer (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - ax, ay, az = self.sensor.acceleration - sum_x += ax * _GRAVITY - sum_y += ay * _GRAVITY - sum_z += az * _GRAVITY - time.sleep_ms(10) - - if FACING_EARTH == FACING_EARTH: - sum_z *= -1 - - # Average offsets (assuming Z-axis should read +9.8 m/s²) - self.accel_offset[0] = sum_x / samples - self.accel_offset[1] = sum_y / samples - self.accel_offset[2] = (sum_z / samples) - _GRAVITY - - return tuple(self.accel_offset) - - def calibrate_gyroscope(self, samples): - """Calibrate gyroscope (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - gx, gy, gz = self.sensor.gyro - sum_x += gx - sum_y += gy - sum_z += gz - time.sleep_ms(10) - - # Average offsets (should be 0 when stationary) - self.gyro_offset[0] = sum_x / samples - self.gyro_offset[1] = sum_y / samples - self.gyro_offset[2] = sum_z / samples - - return tuple(self.gyro_offset) - - def get_calibration(self): - """Get current calibration.""" - return { - 'accel_offsets': self.accel_offset, - 'gyro_offsets': self.gyro_offset - } - - def set_calibration(self, accel_offsets, gyro_offsets): - """Set calibration from saved values.""" - if accel_offsets: - self.accel_offset = list(accel_offsets) - if gyro_offsets: - self.gyro_offset = list(gyro_offsets) - - -class _MPU6886Driver(_IMUDriver): - """Wrapper for MPU6886 IMU (Waveshare board).""" - - def __init__(self, i2c_bus, address): - from drivers.imu_sensor.mpu6886 import MPU6886 - - self.sensor = MPU6886(i2c_bus, address=address) - # Software calibration offsets (MPU6886 has no built-in calibration) - self.accel_offset = [0.0, 0.0, 0.0] - self.gyro_offset = [0.0, 0.0, 0.0] - - def read_temperature(self): - """Read temperature in °C.""" - return self.sensor.temperature - - def read_acceleration(self): - """Read acceleration in m/s² (converts from G).""" - ax, ay, az = self.sensor.acceleration - # Convert G to m/s² and apply calibration - return ( - (ax * _GRAVITY) - self.accel_offset[0], - (ay * _GRAVITY) - self.accel_offset[1], - (az * _GRAVITY) - self.accel_offset[2], - ) - - def read_gyroscope(self): - """Read gyroscope in deg/s (already in correct units).""" - gx, gy, gz = self.sensor.gyro - # Apply calibration - return ( - gx - self.gyro_offset[0], - gy - self.gyro_offset[1], - gz - self.gyro_offset[2], - ) - - def calibrate_accelerometer(self, samples): - """Calibrate accelerometer (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - ax, ay, az = self.sensor.acceleration - sum_x += ax * _GRAVITY - sum_y += ay * _GRAVITY - sum_z += az * _GRAVITY - time.sleep_ms(10) - - if FACING_EARTH == FACING_EARTH: - sum_z *= -1 - - # Average offsets (assuming Z-axis should read +9.8 m/s²) - self.accel_offset[0] = sum_x / samples - self.accel_offset[1] = sum_y / samples - self.accel_offset[2] = (sum_z / samples) - _GRAVITY - - return tuple(self.accel_offset) - - def calibrate_gyroscope(self, samples): - """Calibrate gyroscope (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - gx, gy, gz = self.sensor.gyro - sum_x += gx - sum_y += gy - sum_z += gz - time.sleep_ms(10) - - # Average offsets (should be 0 when stationary) - self.gyro_offset[0] = sum_x / samples - self.gyro_offset[1] = sum_y / samples - self.gyro_offset[2] = sum_z / samples - - return tuple(self.gyro_offset) - - def get_calibration(self): - """Get current calibration.""" - return {"accel_offsets": self.accel_offset, "gyro_offsets": self.gyro_offset} - - def set_calibration(self, accel_offsets, gyro_offsets): - """Set calibration from saved values.""" - if accel_offsets: - self.accel_offset = list(accel_offsets) - if gyro_offsets: - self.gyro_offset = list(gyro_offsets) - - -class _WsenISDSDriver(_IMUDriver): - """Wrapper for WSEN_ISDS IMU (Fri3d badge).""" - - def __init__(self, i2c_bus, address): - from drivers.imu_sensor.wsen_isds import Wsen_Isds - self.sensor = Wsen_Isds( - i2c_bus, - address=address, - acc_range="8g", - acc_data_rate="104Hz", - gyro_range="500dps", - gyro_data_rate="104Hz" - ) - # Software calibration offsets - self.accel_offset = [0.0, 0.0, 0.0] - self.gyro_offset = [0.0, 0.0, 0.0] - - def read_acceleration(self): - """Read acceleration in m/s² (converts from mg).""" - ax, ay, az = self.sensor._read_raw_accelerations() - - # Convert G to m/s² and apply calibration - return ( - ((ax / 1000) * _GRAVITY) - self.accel_offset[0], - ((ay / 1000) * _GRAVITY) - self.accel_offset[1], - ((az / 1000) * _GRAVITY) - self.accel_offset[2] - ) - - def read_gyroscope(self): - """Read gyroscope in deg/s (converts from mdps).""" - gx, gy, gz = self.sensor.read_angular_velocities() - # Convert mdps to deg/s and apply calibration - return ( - gx / 1000.0 - self.gyro_offset[0], - gy / 1000.0 - self.gyro_offset[1], - gz / 1000.0 - self.gyro_offset[2] - ) - - def read_temperature(self): - return self.sensor.temperature - - def calibrate_accelerometer(self, samples): - """Calibrate accelerometer (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - ax, ay, az = self.sensor._read_raw_accelerations() - sum_x += (ax / 1000.0) * _GRAVITY - sum_y += (ay / 1000.0) * _GRAVITY - sum_z += (az / 1000.0) * _GRAVITY - time.sleep_ms(10) - - print(f"sumz: {sum_z}") - z_offset = 0 - if FACING_EARTH == FACING_EARTH: - sum_z *= -1 - print(f"sumz: {sum_z}") - - # Average offsets (assuming Z-axis should read +9.8 m/s²) - self.accel_offset[0] = sum_x / samples - self.accel_offset[1] = sum_y / samples - self.accel_offset[2] = (sum_z / samples) - _GRAVITY - print(f"offsets: {self.accel_offset}") - - return tuple(self.accel_offset) - - def calibrate_gyroscope(self, samples): - """Calibrate gyroscope (device must be stationary).""" - sum_x, sum_y, sum_z = 0.0, 0.0, 0.0 - - for _ in range(samples): - gx, gy, gz = self.sensor.read_angular_velocities() - sum_x += gx / 1000.0 - sum_y += gy / 1000.0 - sum_z += gz / 1000.0 - time.sleep_ms(10) - - # Average offsets (should be 0 when stationary) - self.gyro_offset[0] = sum_x / samples - self.gyro_offset[1] = sum_y / samples - self.gyro_offset[2] = sum_z / samples - - return tuple(self.gyro_offset) - - def get_calibration(self): - """Get current calibration.""" - return { - 'accel_offsets': self.accel_offset, - 'gyro_offsets': self.gyro_offset - } - - def set_calibration(self, accel_offsets, gyro_offsets): - """Set calibration from saved values.""" - if accel_offsets: - self.accel_offset = list(accel_offsets) - if gyro_offsets: - self.gyro_offset = list(gyro_offsets) # ============================================================================