Rework IMU drivers

This commit is contained in:
Thomas Farstrike
2026-02-21 12:11:08 +01:00
parent aea339a143
commit 70915a78ca
10 changed files with 1026 additions and 1099 deletions
@@ -0,0 +1,29 @@
from mpos.imu.constants import (
TYPE_ACCELEROMETER,
TYPE_MAGNETIC_FIELD,
TYPE_GYROSCOPE,
TYPE_TEMPERATURE,
TYPE_IMU_TEMPERATURE,
TYPE_SOC_TEMPERATURE,
FACING_EARTH,
FACING_SKY,
GRAVITY,
IMU_CALIBRATION_FILENAME,
)
from mpos.imu.sensor import Sensor
from mpos.imu.manager import ImuManager
__all__ = [
"TYPE_ACCELEROMETER",
"TYPE_MAGNETIC_FIELD",
"TYPE_GYROSCOPE",
"TYPE_TEMPERATURE",
"TYPE_IMU_TEMPERATURE",
"TYPE_SOC_TEMPERATURE",
"FACING_EARTH",
"FACING_SKY",
"GRAVITY",
"IMU_CALIBRATION_FILENAME",
"Sensor",
"ImuManager",
]
@@ -0,0 +1,15 @@
TYPE_ACCELEROMETER = 1 # Units: m/s² (meters per second squared)
TYPE_MAGNETIC_FIELD = 2 # Units: μT (micro teslas)
TYPE_GYROSCOPE = 4 # Units: deg/s (degrees per second)
TYPE_TEMPERATURE = 13 # Units: °C (generic, returns first available - deprecated)
TYPE_IMU_TEMPERATURE = 14 # Units: °C (IMU chip temperature)
TYPE_SOC_TEMPERATURE = 15 # Units: °C (MCU/SoC internal temperature)
# mounted_position:
FACING_EARTH = 20 # underside of PCB, like fri3d_2024
FACING_SKY = 21 # top of PCB, like waveshare_esp32_s3_lcd_touch_2 (default)
# Gravity constant for unit conversions
GRAVITY = 9.80665 # m/s²
IMU_CALIBRATION_FILENAME = "imu_calibration.json"
@@ -0,0 +1,82 @@
import time
from mpos.imu.constants import GRAVITY, FACING_EARTH
class IMUDriverBase:
"""Base class for IMU drivers with shared calibration logic."""
def __init__(self):
self.accel_offset = [0.0, 0.0, 0.0]
self.gyro_offset = [0.0, 0.0, 0.0]
def read_acceleration(self):
"""Returns (x, y, z) in m/s²"""
raise NotImplementedError
def read_gyroscope(self):
"""Returns (x, y, z) in deg/s"""
raise NotImplementedError
def read_temperature(self):
"""Returns temperature in °C"""
raise NotImplementedError
def _raw_acceleration_mps2(self):
"""Returns raw (x, y, z) in m/s² for calibration sampling."""
raise NotImplementedError
def _raw_gyroscope_dps(self):
"""Returns raw (x, y, z) in deg/s for calibration sampling."""
raise NotImplementedError
def calibrate_accelerometer(self, samples):
"""Calibrate accel, return (x, y, z) offsets in m/s²"""
sum_x, sum_y, sum_z = 0.0, 0.0, 0.0
for _ in range(samples):
ax, ay, az = self._raw_acceleration_mps2()
sum_x += ax
sum_y += ay
sum_z += az
time.sleep_ms(10)
if FACING_EARTH == FACING_EARTH:
sum_z *= -1
self.accel_offset[0] = sum_x / samples
self.accel_offset[1] = sum_y / samples
self.accel_offset[2] = (sum_z / samples) - GRAVITY
return tuple(self.accel_offset)
def calibrate_gyroscope(self, samples):
"""Calibrate gyro, return (x, y, z) offsets in deg/s"""
sum_x, sum_y, sum_z = 0.0, 0.0, 0.0
for _ in range(samples):
gx, gy, gz = self._raw_gyroscope_dps()
sum_x += gx
sum_y += gy
sum_z += gz
time.sleep_ms(10)
self.gyro_offset[0] = sum_x / samples
self.gyro_offset[1] = sum_y / samples
self.gyro_offset[2] = sum_z / samples
return tuple(self.gyro_offset)
def get_calibration(self):
"""Return dict with 'accel_offsets' and 'gyro_offsets' keys"""
return {
"accel_offsets": self.accel_offset,
"gyro_offsets": self.gyro_offset,
}
def set_calibration(self, accel_offsets, gyro_offsets):
"""Set calibration offsets from saved values"""
if accel_offsets:
self.accel_offset = list(accel_offsets)
if gyro_offsets:
self.gyro_offset = list(gyro_offsets)
@@ -0,0 +1,140 @@
import os
from mpos.imu.drivers.base import IMUDriverBase
class IIODriver(IMUDriverBase):
"""
Read sensor data via Linux IIO sysfs.
Typical base path:
/sys/bus/iio/devices/iio:device0
"""
accel_path: str
def __init__(self):
super().__init__()
self.accel_path = self.find_iio_device_with_file("in_accel_x_raw")
print("path:", self.accel_path)
def _p(self, name: str):
return self.accel_path + "/" + name
def _exists(self, name):
try:
os.stat(name)
return True
except OSError:
return False
def _is_dir(self, path):
# MicroPython: stat tuple, mode is [0]
try:
st = os.stat(path)
mode = st[0]
# directory bit (POSIX): 0o040000
return (mode & 0o170000) == 0o040000
except OSError:
return False
def find_iio_device_with_file(self, filename, base_dir="/sys/bus/iio/devices/"):
"""
Returns full path to iio:deviceX that contains given filename,
e.g. "/sys/bus/iio/devices/iio:device0"
Returns None if not found.
"""
print("Is dir? ", self._is_dir(base_dir), base_dir)
try:
entries = os.listdir(base_dir)
except OSError:
print("Error listing dir")
return None
for entry in entries:
print("Entry:", entry)
if not entry.startswith("iio:device"):
continue
dev_path = base_dir + "/" + entry
if not self._is_dir(dev_path):
continue
if self._exists(dev_path + "/" + filename):
return dev_path
return None
def _read_text(self, name: str) -> str:
print("Read: ", name)
f = open(name, "r")
try:
return f.readline().strip()
finally:
f.close()
def _read_float(self, name: str) -> float:
return float(self._read_text(name))
def _read_int(self, name: str) -> int:
return int(self._read_text(name), 10)
def _read_raw_scaled(self, raw_name: str, scale_name: str) -> float:
raw = self._read_int(raw_name)
scale = self._read_float(scale_name)
return raw * scale
def read_temperature(self) -> float:
"""
Tries common IIO patterns:
- in_temp_input (already scaled, usually millidegree C)
- in_temp_raw + in_temp_scale
"""
if not self.accel_path:
return None
raw_path = self.accel_path + "/" + "in_temp_raw"
scale_path = self.accel_path + "/" + "in_temp_scale"
if not self._exists(raw_path) or not self._exists(scale_path):
return None
return self._read_raw_scaled(raw_path, scale_path)
def _raw_acceleration_mps2(self):
if not self.accel_path:
return (0.0, 0.0, 0.0)
scale_name = self.accel_path + "/" + "in_accel_scale"
ax = self._read_raw_scaled(self.accel_path + "/" + "in_accel_x_raw", scale_name)
ay = self._read_raw_scaled(self.accel_path + "/" + "in_accel_y_raw", scale_name)
az = self._read_raw_scaled(self.accel_path + "/" + "in_accel_z_raw", scale_name)
return (ax, ay, az)
def _raw_gyroscope_dps(self):
if not self.accel_path:
return (0.0, 0.0, 0.0)
scale_name = self.accel_path + "/" + "in_anglvel_scale"
gx = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_x_raw", scale_name)
gy = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_y_raw", scale_name)
gz = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_z_raw", scale_name)
return (gx, gy, gz)
def read_acceleration(self):
ax, ay, az = self._raw_acceleration_mps2()
return (
ax - self.accel_offset[0],
ay - self.accel_offset[1],
az - self.accel_offset[2],
)
def read_gyroscope(self):
gx, gy, gz = self._raw_gyroscope_dps()
return (
gx - self.gyro_offset[0],
gy - self.gyro_offset[1],
gz - self.gyro_offset[2],
)
@@ -0,0 +1,39 @@
from mpos.imu.constants import GRAVITY
from mpos.imu.drivers.base import IMUDriverBase
class MPU6886Driver(IMUDriverBase):
"""Wrapper for MPU6886 IMU (Waveshare board)."""
def __init__(self, i2c_bus, address):
super().__init__()
from drivers.imu_sensor.mpu6886 import MPU6886
self.sensor = MPU6886(i2c_bus, address=address)
def _raw_acceleration_mps2(self):
ax, ay, az = self.sensor.acceleration
return (ax * GRAVITY, ay * GRAVITY, az * GRAVITY)
def _raw_gyroscope_dps(self):
gx, gy, gz = self.sensor.gyro
return (gx, gy, gz)
def read_temperature(self):
return self.sensor.temperature
def read_acceleration(self):
ax, ay, az = self._raw_acceleration_mps2()
return (
ax - self.accel_offset[0],
ay - self.accel_offset[1],
az - self.accel_offset[2],
)
def read_gyroscope(self):
gx, gy, gz = self._raw_gyroscope_dps()
return (
gx - self.gyro_offset[0],
gy - self.gyro_offset[1],
gz - self.gyro_offset[2],
)
@@ -0,0 +1,46 @@
from mpos.imu.constants import GRAVITY
from mpos.imu.drivers.base import IMUDriverBase
class QMI8658Driver(IMUDriverBase):
"""Wrapper for QMI8658 IMU (Waveshare board)."""
def __init__(self, i2c_bus, address):
super().__init__()
from drivers.imu_sensor.qmi8658 import QMI8658
_ACCELSCALE_RANGE_8G = 0b10
_GYROSCALE_RANGE_256DPS = 0b100
self.sensor = QMI8658(
i2c_bus,
address=address,
accel_scale=_ACCELSCALE_RANGE_8G,
gyro_scale=_GYROSCALE_RANGE_256DPS,
)
def _raw_acceleration_mps2(self):
ax, ay, az = self.sensor.acceleration
return (ax * GRAVITY, ay * GRAVITY, az * GRAVITY)
def _raw_gyroscope_dps(self):
gx, gy, gz = self.sensor.gyro
return (gx, gy, gz)
def read_acceleration(self):
ax, ay, az = self._raw_acceleration_mps2()
return (
ax - self.accel_offset[0],
ay - self.accel_offset[1],
az - self.accel_offset[2],
)
def read_gyroscope(self):
gx, gy, gz = self._raw_gyroscope_dps()
return (
gx - self.gyro_offset[0],
gy - self.gyro_offset[1],
gz - self.gyro_offset[2],
)
def read_temperature(self):
return self.sensor.temperature
@@ -0,0 +1,46 @@
from mpos.imu.constants import GRAVITY
from mpos.imu.drivers.base import IMUDriverBase
class WsenISDSDriver(IMUDriverBase):
"""Wrapper for WSEN_ISDS IMU (Fri3d badge)."""
def __init__(self, i2c_bus, address):
super().__init__()
from drivers.imu_sensor.wsen_isds import Wsen_Isds
self.sensor = Wsen_Isds(
i2c_bus,
address=address,
acc_range="8g",
acc_data_rate="104Hz",
gyro_range="500dps",
gyro_data_rate="104Hz",
)
def _raw_acceleration_mps2(self):
ax, ay, az = self.sensor._read_raw_accelerations()
return ((ax / 1000) * GRAVITY, (ay / 1000) * GRAVITY, (az / 1000) * GRAVITY)
def _raw_gyroscope_dps(self):
gx, gy, gz = self.sensor.read_angular_velocities()
return (gx / 1000.0, gy / 1000.0, gz / 1000.0)
def read_acceleration(self):
ax, ay, az = self._raw_acceleration_mps2()
return (
ax - self.accel_offset[0],
ay - self.accel_offset[1],
az - self.accel_offset[2],
)
def read_gyroscope(self):
gx, gy, gz = self._raw_gyroscope_dps()
return (
gx - self.gyro_offset[0],
gy - self.gyro_offset[1],
gz - self.gyro_offset[2],
)
def read_temperature(self):
return self.sensor.temperature
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,25 @@
class Sensor:
"""Sensor metadata (lightweight data class, Android-inspired)."""
def __init__(self, name, sensor_type, vendor, version, max_range, resolution, power_ma):
"""Initialize sensor metadata.
Args:
name: Human-readable sensor name
sensor_type: Sensor type constant (TYPE_ACCELEROMETER, etc.)
vendor: Sensor vendor/manufacturer
version: Driver version
max_range: Maximum measurement range (with units)
resolution: Measurement resolution (with units)
power_ma: Power consumption in mA (or 0 if unknown)
"""
self.name = name
self.type = sensor_type
self.vendor = vendor
self.version = version
self.max_range = max_range
self.resolution = resolution
self.power = power_ma
def __repr__(self):
return f"Sensor({self.name}, type={self.type})"
File diff suppressed because it is too large Load Diff