You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Merge pull request #45 from pavelmachek/m_3_imu
imu: Allow access to iio on Linux
This commit is contained in:
@@ -122,12 +122,9 @@ AudioManager(i2s_pins=i2s_pins)
|
||||
# LightsManager will not be initialized (functions will return False)
|
||||
|
||||
# === SENSOR HARDWARE ===
|
||||
# Note: Desktop builds have no sensor hardware
|
||||
from mpos import SensorManager
|
||||
|
||||
# Initialize with no I2C bus - will detect MCU temp if available
|
||||
# (On Linux desktop, this will fail gracefully but set _initialized flag)
|
||||
SensorManager.init(None)
|
||||
SensorManager.init_iio()
|
||||
|
||||
# === CAMERA HARDWARE ===
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ Copyright (c) 2024 MicroPythonOS contributors
|
||||
"""
|
||||
|
||||
import time
|
||||
import os
|
||||
try:
|
||||
import _thread
|
||||
_lock = _thread.allocate_lock()
|
||||
@@ -28,6 +29,7 @@ except ImportError:
|
||||
|
||||
# Sensor type constants (matching Android SensorManager)
|
||||
TYPE_ACCELEROMETER = 1 # Units: m/s² (meters per second squared)
|
||||
TYPE_MAGNETIC_FIELD = 2 # Units: μT (micro teslas)
|
||||
TYPE_GYROSCOPE = 4 # Units: deg/s (degrees per second)
|
||||
TYPE_TEMPERATURE = 13 # Units: °C (generic, returns first available - deprecated)
|
||||
TYPE_IMU_TEMPERATURE = 14 # Units: °C (IMU chip temperature)
|
||||
@@ -101,6 +103,7 @@ class SensorManager:
|
||||
|
||||
# Class-level constants
|
||||
TYPE_ACCELEROMETER = TYPE_ACCELEROMETER
|
||||
TYPE_MAGNETIC_FIELD = TYPE_MAGNETIC_FIELD
|
||||
TYPE_GYROSCOPE = TYPE_GYROSCOPE
|
||||
TYPE_TEMPERATURE = TYPE_TEMPERATURE
|
||||
TYPE_IMU_TEMPERATURE = TYPE_IMU_TEMPERATURE
|
||||
@@ -120,7 +123,7 @@ class SensorManager:
|
||||
if cls._instance is None:
|
||||
cls._instance = cls()
|
||||
return cls._instance
|
||||
|
||||
|
||||
def init(self, i2c_bus, address=0x6B, mounted_position=FACING_SKY):
|
||||
"""Initialize SensorManager. MCU temperature initializes immediately, IMU initializes on first use.
|
||||
|
||||
@@ -146,6 +149,43 @@ class SensorManager:
|
||||
|
||||
self._initialized = True
|
||||
return True
|
||||
|
||||
def init_iio(self):
|
||||
self._imu_driver = _IIODriver()
|
||||
self._sensor_list = [
|
||||
Sensor(
|
||||
name="Accelerometer",
|
||||
sensor_type=TYPE_ACCELEROMETER,
|
||||
vendor="Linux IIO",
|
||||
version=1,
|
||||
max_range="?",
|
||||
resolution="?",
|
||||
power_ma=10
|
||||
),
|
||||
Sensor(
|
||||
name="Gyroscope",
|
||||
sensor_type=TYPE_GYROSCOPE,
|
||||
vendor="Linux IIO",
|
||||
version=1,
|
||||
max_range="?",
|
||||
resolution="?",
|
||||
power_ma=10
|
||||
),
|
||||
Sensor(
|
||||
name="Temperature",
|
||||
sensor_type=TYPE_IMU_TEMPERATURE,
|
||||
vendor="Linux IIO",
|
||||
version=1,
|
||||
max_range="?",
|
||||
resolution="?",
|
||||
power_ma=10
|
||||
)
|
||||
]
|
||||
|
||||
self._load_calibration()
|
||||
|
||||
self._initialized = True
|
||||
return True
|
||||
|
||||
def _ensure_imu_initialized(self):
|
||||
"""Perform IMU initialization on first use (lazy initialization).
|
||||
@@ -245,7 +285,35 @@ class SensorManager:
|
||||
if sensor.type == sensor_type:
|
||||
return sensor
|
||||
return None
|
||||
|
||||
|
||||
def read_sensor_once(self, sensor):
|
||||
if sensor.type == TYPE_ACCELEROMETER:
|
||||
if self._imu_driver:
|
||||
ax, ay, az = self._imu_driver.read_acceleration()
|
||||
if self._mounted_position == FACING_EARTH:
|
||||
az *= -1
|
||||
return (ax, ay, az)
|
||||
elif sensor.type == TYPE_GYROSCOPE:
|
||||
if self._imu_driver:
|
||||
return self._imu_driver.read_gyroscope()
|
||||
elif sensor.type == TYPE_IMU_TEMPERATURE:
|
||||
if self._imu_driver:
|
||||
return self._imu_driver.read_temperature()
|
||||
elif sensor.type == TYPE_SOC_TEMPERATURE:
|
||||
if self._has_mcu_temperature:
|
||||
import esp32
|
||||
return esp32.mcu_temperature()
|
||||
elif sensor.type == TYPE_TEMPERATURE:
|
||||
# Generic temperature - return first available (backward compatibility)
|
||||
if self._imu_driver:
|
||||
temp = self._imu_driver.read_temperature()
|
||||
if temp is not None:
|
||||
return temp
|
||||
if self._has_mcu_temperature:
|
||||
import esp32
|
||||
return esp32.mcu_temperature()
|
||||
return None
|
||||
|
||||
def read_sensor(self, sensor):
|
||||
"""Read sensor data synchronously.
|
||||
|
||||
@@ -276,32 +344,7 @@ class SensorManager:
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
if sensor.type == TYPE_ACCELEROMETER:
|
||||
if self._imu_driver:
|
||||
ax, ay, az = self._imu_driver.read_acceleration()
|
||||
if self._mounted_position == FACING_EARTH:
|
||||
az *= -1
|
||||
return (ax, ay, az)
|
||||
elif sensor.type == TYPE_GYROSCOPE:
|
||||
if self._imu_driver:
|
||||
return self._imu_driver.read_gyroscope()
|
||||
elif sensor.type == TYPE_IMU_TEMPERATURE:
|
||||
if self._imu_driver:
|
||||
return self._imu_driver.read_temperature()
|
||||
elif sensor.type == TYPE_SOC_TEMPERATURE:
|
||||
if self._has_mcu_temperature:
|
||||
import esp32
|
||||
return esp32.mcu_temperature()
|
||||
elif sensor.type == TYPE_TEMPERATURE:
|
||||
# Generic temperature - return first available (backward compatibility)
|
||||
if self._imu_driver:
|
||||
temp = self._imu_driver.read_temperature()
|
||||
if temp is not None:
|
||||
return temp
|
||||
if self._has_mcu_temperature:
|
||||
import esp32
|
||||
return esp32.mcu_temperature()
|
||||
return None
|
||||
return self.read_sensor_once(sensor)
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
# Retry if sensor data not ready, otherwise fail immediately
|
||||
@@ -310,6 +353,7 @@ class SensorManager:
|
||||
time.sleep_ms(retry_delay_ms)
|
||||
continue
|
||||
else:
|
||||
print("Exception reading sensor:", error_msg)
|
||||
return None
|
||||
|
||||
return None
|
||||
@@ -767,6 +811,138 @@ class _IMUDriver:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class _IIODriver(_IMUDriver):
|
||||
"""
|
||||
Read sensor data via Linux IIO sysfs.
|
||||
|
||||
Typical base path:
|
||||
/sys/bus/iio/devices/iio:device0
|
||||
"""
|
||||
accel_path: str
|
||||
|
||||
def __init__(self):
|
||||
self.accel_path = self.find_iio_device_with_file("in_accel_x_raw")
|
||||
print("path:", self.accel_path)
|
||||
|
||||
def _p(self, name: str):
|
||||
return self.accel_path + "/" + name
|
||||
|
||||
def _exists(self, name):
|
||||
try:
|
||||
os.stat(name)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def _is_dir(self, path):
|
||||
# MicroPython: stat tuple, mode is [0]
|
||||
try:
|
||||
st = os.stat(path)
|
||||
mode = st[0]
|
||||
# directory bit (POSIX): 0o040000
|
||||
return (mode & 0o170000) == 0o040000
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def find_iio_device_with_file(self, filename, base_dir="/sys/bus/iio/devices/"):
|
||||
"""
|
||||
Returns full path to iio:deviceX that contains given filename,
|
||||
e.g. "/sys/bus/iio/devices/iio:device0"
|
||||
|
||||
Returns None if not found.
|
||||
"""
|
||||
|
||||
print("Is dir? ", self._is_dir(base_dir), base_dir)
|
||||
try:
|
||||
entries = os.listdir(base_dir)
|
||||
except OSError:
|
||||
print("Error listing dir")
|
||||
return None
|
||||
|
||||
for e in entries:
|
||||
print("Entry:", e)
|
||||
if not e.startswith("iio:device"):
|
||||
continue
|
||||
|
||||
print("Entry:", e)
|
||||
|
||||
dev_path = base_dir + "/" + e
|
||||
if not self._is_dir(dev_path):
|
||||
continue
|
||||
|
||||
if self._exists(dev_path + "/" + filename):
|
||||
return dev_path
|
||||
|
||||
return None
|
||||
|
||||
def _read_text(self, name: str) -> str:
|
||||
p = name
|
||||
print("Read: ", p)
|
||||
f = open(p, "r")
|
||||
try:
|
||||
return f.readline().strip()
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
def _read_float(self, name: str) -> float:
|
||||
return float(self._read_text(name))
|
||||
|
||||
def _read_int(self, name: str) -> int:
|
||||
return int(self._read_text(name), 10)
|
||||
|
||||
def _read_raw_scaled(self, raw_name: str, scale_name: str) -> float:
|
||||
raw = self._read_int(raw_name)
|
||||
scale = self._read_float(scale_name)
|
||||
return raw * scale
|
||||
|
||||
# ----------------------------
|
||||
# Public API (replacing I2C)
|
||||
# ----------------------------
|
||||
|
||||
def read_temperature(self) -> float:
|
||||
"""
|
||||
Tries common IIO patterns:
|
||||
- in_temp_input (already scaled, usually millidegree C)
|
||||
- in_temp_raw + in_temp_scale
|
||||
"""
|
||||
if False: # os.path.exists(self._p("in_temp_input")):
|
||||
v = self._read_float(self.accel_path + "/" + "in_temp_input")
|
||||
# Many drivers expose millidegree Celsius here.
|
||||
if abs(v) > 200: # heuristic: 25000 means 25°C
|
||||
return v / 1000.0
|
||||
return v
|
||||
|
||||
# Fallback: raw + scale
|
||||
return self._read_raw_scaled(self.accel_path + "/" + "in_temp_raw", self.accel_path + "/" + "in_temp_scale")
|
||||
|
||||
def read_acceleration(self) -> tuple[float, float, float]:
|
||||
"""
|
||||
Returns acceleration in m/s^2 if the kernel driver uses standard IIO scale.
|
||||
Common names:
|
||||
in_accel_{x,y,z}_raw + in_accel_scale
|
||||
"""
|
||||
scale_name = self.accel_path + "/" + "in_accel_scale"
|
||||
|
||||
ax = self._read_raw_scaled(self.accel_path + "/" + "in_accel_x_raw", scale_name)
|
||||
ay = self._read_raw_scaled(self.accel_path + "/" + "in_accel_y_raw", scale_name)
|
||||
az = self._read_raw_scaled(self.accel_path + "/" + "in_accel_z_raw", scale_name)
|
||||
|
||||
return (ax, ay, az)
|
||||
|
||||
def read_gyroscope(self) -> tuple[float, float, float]:
|
||||
"""
|
||||
Returns angular velocity in rad/s if the kernel driver uses standard IIO scale.
|
||||
Common names:
|
||||
in_anglvel_{x,y,z}_raw + in_anglvel_scale
|
||||
"""
|
||||
scale_name = self.accel_path + "/" + "in_anglvel_scale"
|
||||
|
||||
gx = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_x_raw", scale_name)
|
||||
gy = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_y_raw", scale_name)
|
||||
gz = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_z_raw", scale_name)
|
||||
|
||||
return (gx, gy, gz)
|
||||
|
||||
class _QMI8658Driver(_IMUDriver):
|
||||
"""Wrapper for QMI8658 IMU (Waveshare board)."""
|
||||
|
||||
@@ -1054,8 +1230,8 @@ class _WsenISDSDriver(_IMUDriver):
|
||||
|
||||
_original_methods = {}
|
||||
_methods_to_delegate = [
|
||||
'init', 'is_available', 'get_sensor_list', 'get_default_sensor',
|
||||
'read_sensor', 'calibrate_sensor', 'check_calibration_quality',
|
||||
'init', 'init_iio', 'is_available', 'get_sensor_list', 'get_default_sensor',
|
||||
'read_sensor', 'read_sensor_once', 'calibrate_sensor', 'check_calibration_quality',
|
||||
'check_stationarity'
|
||||
]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user