API: add AudioFlinger for audio playback (i2s DAC and buzzer)

API: add LightsManager for multicolor LEDs
This commit is contained in:
Thomas Farstrike
2025-12-03 22:32:36 +01:00
parent 82f55e0698
commit f37ca70a89
20 changed files with 2016 additions and 140 deletions
+206
View File
@@ -643,6 +643,212 @@ def defocus_handler(self, obj):
- `mpos.clipboard`: System clipboard access
- `mpos.battery_voltage`: Battery level reading (ESP32 only)
## Audio System (AudioFlinger)
MicroPythonOS provides a centralized audio service called **AudioFlinger** (Android-inspired) that manages audio playback across different hardware outputs.
### Supported Audio Devices
- **I2S**: Digital audio output for WAV file playback (Fri3d badge, Waveshare board)
- **Buzzer**: PWM-based tone/ringtone playback (Fri3d badge only)
- **Both**: Simultaneous I2S and buzzer support
- **Null**: No audio (desktop/Linux)
### Basic Usage
**Playing WAV files**:
```python
import mpos.audio.audioflinger as AudioFlinger
# Play music file
success = AudioFlinger.play_wav(
"M:/sdcard/music/song.wav",
stream_type=AudioFlinger.STREAM_MUSIC,
volume=80,
on_complete=lambda msg: print(msg)
)
if not success:
print("Audio playback rejected (higher priority stream active)")
```
**Playing RTTTL ringtones**:
```python
# Play notification sound via buzzer
rtttl = "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#,8c#6,8b,d,8p,8b,8a,8c#,8e"
AudioFlinger.play_rtttl(
rtttl,
stream_type=AudioFlinger.STREAM_NOTIFICATION
)
```
**Volume control**:
```python
AudioFlinger.set_volume(70) # 0-100
volume = AudioFlinger.get_volume()
```
**Stopping playback**:
```python
AudioFlinger.stop()
```
### Audio Focus Priority
AudioFlinger implements priority-based audio focus (Android-inspired):
- **STREAM_ALARM** (priority 2): Highest priority
- **STREAM_NOTIFICATION** (priority 1): Medium priority
- **STREAM_MUSIC** (priority 0): Lowest priority
Higher priority streams automatically interrupt lower priority streams. Equal or lower priority streams are rejected while a stream is playing.
### Hardware Support Matrix
| Board | I2S | Buzzer | LEDs |
|-------|-----|--------|------|
| Fri3d 2024 Badge | ✓ (GPIO 2, 47, 16) | ✓ (GPIO 46) | ✓ (5 RGB, GPIO 12) |
| Waveshare ESP32-S3 | ✓ (GPIO 2, 47, 16) | ✗ | ✗ |
| Linux/macOS | ✗ | ✗ | ✗ |
### Configuration
Audio device preference is configured in Settings app under "Advanced Settings":
- **Auto-detect**: Use available hardware (default)
- **I2S (Digital Audio)**: Digital audio only
- **Buzzer (PWM Tones)**: Tones/ringtones only
- **Both I2S and Buzzer**: Use both devices
- **Disabled**: No audio
**Note**: Changing the audio device requires a restart to take effect.
### Implementation Details
- **Location**: `lib/mpos/audio/audioflinger.py`
- **Pattern**: Module-level singleton (similar to `battery_voltage.py`)
- **Thread-safe**: Uses locks for concurrent access
- **Background playback**: Runs in separate thread
- **WAV support**: 8/16/24/32-bit PCM, mono/stereo, auto-upsampling to ≥22050 Hz
- **RTTTL parser**: Full Ring Tone Text Transfer Language support with exponential volume curve
## LED Control (LightsManager)
MicroPythonOS provides a simple LED control service for NeoPixel RGB LEDs (Fri3d badge only).
### Basic Usage
**Check availability**:
```python
import mpos.lights as LightsManager
if LightsManager.is_available():
print(f"LEDs available: {LightsManager.get_led_count()}")
```
**Control individual LEDs**:
```python
# Set LED 0 to red (buffered)
LightsManager.set_led(0, 255, 0, 0)
# Set LED 1 to green
LightsManager.set_led(1, 0, 255, 0)
# Update hardware
LightsManager.write()
```
**Control all LEDs**:
```python
# Set all LEDs to blue
LightsManager.set_all(0, 0, 255)
LightsManager.write()
# Clear all LEDs (black)
LightsManager.clear()
LightsManager.write()
```
**Notification colors**:
```python
# Convenience method for common colors
LightsManager.set_notification_color("red")
LightsManager.set_notification_color("green")
# Available: red, green, blue, yellow, orange, purple, white
```
### Custom Animations
LightsManager provides one-shot control only (no built-in animations). Apps implement custom animations using the `update_frame()` pattern:
```python
import time
import mpos.lights as LightsManager
def blink_pattern():
for _ in range(5):
LightsManager.set_all(255, 0, 0)
LightsManager.write()
time.sleep_ms(200)
LightsManager.clear()
LightsManager.write()
time.sleep_ms(200)
def rainbow_cycle():
colors = [
(255, 0, 0), # Red
(255, 128, 0), # Orange
(255, 255, 0), # Yellow
(0, 255, 0), # Green
(0, 0, 255), # Blue
]
for i, color in enumerate(colors):
LightsManager.set_led(i, *color)
LightsManager.write()
```
**For frame-based LED animations**, use the TaskHandler event system:
```python
import mpos.ui
import time
class LEDAnimationActivity(Activity):
last_time = 0
led_index = 0
def onResume(self, screen):
self.last_time = time.ticks_ms()
mpos.ui.task_handler.add_event_cb(self.update_frame, 1)
def onPause(self, screen):
mpos.ui.task_handler.remove_event_cb(self.update_frame)
LightsManager.clear()
LightsManager.write()
def update_frame(self, a, b):
current_time = time.ticks_ms()
delta_time = time.ticks_diff(current_time, self.last_time) / 1000.0
self.last_time = current_time
# Update animation every 0.5 seconds
if delta_time > 0.5:
LightsManager.clear()
LightsManager.set_led(self.led_index, 0, 255, 0)
LightsManager.write()
self.led_index = (self.led_index + 1) % LightsManager.get_led_count()
```
### Implementation Details
- **Location**: `lib/mpos/lights.py`
- **Pattern**: Module-level singleton (similar to `battery_voltage.py`)
- **Hardware**: 5 NeoPixel RGB LEDs on GPIO 12 (Fri3d badge)
- **Buffered**: LED colors are buffered until `write()` is called
- **Thread-safe**: No locking (single-threaded usage recommended)
- **Desktop**: Functions return `False` (no-op) on desktop builds
## Animations and Game Loops
MicroPythonOS supports frame-based animations and game loops using the TaskHandler event system. This pattern is used for games, particle effects, and smooth animations.
@@ -1,13 +1,11 @@
import machine
import os
import _thread
import time
from mpos.apps import Activity, Intent
import mpos.sdcard
import mpos.ui
from audio_player import AudioPlayer
import mpos.audio.audioflinger as AudioFlinger
class MusicPlayer(Activity):
@@ -68,17 +66,17 @@ class FullscreenPlayer(Activity):
self._filename = self.getIntent().extras.get("filename")
qr_screen = lv.obj()
self._slider_label=lv.label(qr_screen)
self._slider_label.set_text(f"Volume: {AudioPlayer.get_volume()}%")
self._slider_label.set_text(f"Volume: {AudioFlinger.get_volume()}%")
self._slider_label.align(lv.ALIGN.TOP_MID,0,lv.pct(4))
self._slider=lv.slider(qr_screen)
self._slider.set_range(0,100)
self._slider.set_value(AudioPlayer.get_volume(), False)
self._slider.set_value(AudioFlinger.get_volume(), False)
self._slider.set_width(lv.pct(90))
self._slider.align_to(self._slider_label,lv.ALIGN.OUT_BOTTOM_MID,0,10)
def volume_slider_changed(e):
volume_int = self._slider.get_value()
self._slider_label.set_text(f"Volume: {volume_int}%")
AudioPlayer.set_volume(volume_int)
AudioFlinger.set_volume(volume_int)
self._slider.add_event_cb(volume_slider_changed,lv.EVENT.VALUE_CHANGED,None)
self._filename_label = lv.label(qr_screen)
self._filename_label.align(lv.ALIGN.CENTER,0,0)
@@ -104,11 +102,23 @@ class FullscreenPlayer(Activity):
if not self._filename:
print("Not playing any file...")
else:
print("Starting thread to play file {self._filename}")
AudioPlayer.stop_playing()
print(f"Playing file {self._filename}")
AudioFlinger.stop()
time.sleep(0.1)
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(AudioPlayer.play_wav, (self._filename,self.player_finished,))
success = AudioFlinger.play_wav(
self._filename,
stream_type=AudioFlinger.STREAM_MUSIC,
on_complete=self.player_finished
)
if not success:
error_msg = "Error: Audio device unavailable or busy"
print(error_msg)
self.update_ui_threadsafe_if_foreground(
self._filename_label.set_text,
error_msg
)
def focus_obj(self, obj):
obj.set_style_border_color(lv.theme_get_color_primary(None),lv.PART.MAIN)
@@ -118,7 +128,7 @@ class FullscreenPlayer(Activity):
obj.set_style_border_width(0, lv.PART.MAIN)
def stop_button_clicked(self, event):
AudioPlayer.stop_playing()
AudioFlinger.stop()
self.finish()
def player_finished(self, result=None):
@@ -43,6 +43,7 @@ class SettingsActivity(Activity):
{"title": "Theme Color", "key": "theme_primary_color", "value_label": None, "cont": None, "placeholder": "HTML hex color, like: EC048C", "ui": "dropdown", "ui_options": theme_colors},
{"title": "Timezone", "key": "timezone", "value_label": None, "cont": None, "ui": "dropdown", "ui_options": self.get_timezone_tuples(), "changed_callback": lambda : mpos.time.refresh_timezone_preference()},
# Advanced settings, alphabetically:
{"title": "Audio Output Device", "key": "audio_device", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [("Auto-detect", "auto"), ("I2S (Digital Audio)", "i2s"), ("Buzzer (PWM Tones)", "buzzer"), ("Both I2S and Buzzer", "both"), ("Disabled", "null")], "changed_callback": self.audio_device_changed},
{"title": "Auto Start App", "key": "auto_start_app", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [(app.name, app.fullname) for app in PackageManager.get_app_list()]},
{"title": "Restart to Bootloader", "key": "boot_mode", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [("Normal", "normal"), ("Bootloader", "bootloader")]}, # special that doesn't get saved
{"title": "Format internal data partition", "key": "format_internal_data_partition", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [("No, do not format", "no"), ("Yes, erase all settings, files and non-builtin apps", "yes")]}, # special that doesn't get saved
@@ -111,6 +112,34 @@ class SettingsActivity(Activity):
def get_timezone_tuples():
return [(tz, tz) for tz in mpos.time.get_timezones()]
def audio_device_changed(self):
"""
Called when audio device setting changes.
Note: Changing device type at runtime requires a restart for full effect.
AudioFlinger initialization happens at boot.
"""
import mpos.audio.audioflinger as AudioFlinger
new_value = self.prefs.get_string("audio_device", "auto")
print(f"Audio device setting changed to: {new_value}")
print("Note: Restart required for audio device change to take effect")
# Map setting values to device types
device_map = {
"auto": AudioFlinger.get_device_type(), # Keep current
"i2s": AudioFlinger.DEVICE_I2S,
"buzzer": AudioFlinger.DEVICE_BUZZER,
"both": AudioFlinger.DEVICE_BOTH,
"null": AudioFlinger.DEVICE_NULL,
}
desired_device = device_map.get(new_value, AudioFlinger.get_device_type())
current_device = AudioFlinger.get_device_type()
if desired_device != current_device:
print(f"Desired device type ({desired_device}) differs from current ({current_device})")
print("Full device type change requires restart - current session continues with existing device")
def focus_container(self, container):
print(f"container {container} focused, setting border...")
container.set_style_border_color(lv.theme_get_color_primary(None),lv.PART.MAIN)
@@ -0,0 +1,55 @@
# AudioFlinger - Centralized Audio Management Service for MicroPythonOS
# Android-inspired audio routing with priority-based audio focus
from . import audioflinger
# Re-export main API
from .audioflinger import (
# Device types
DEVICE_NULL,
DEVICE_I2S,
DEVICE_BUZZER,
DEVICE_BOTH,
# Stream types
STREAM_MUSIC,
STREAM_NOTIFICATION,
STREAM_ALARM,
# Core functions
init,
play_wav,
play_rtttl,
stop,
pause,
resume,
set_volume,
get_volume,
get_device_type,
is_playing,
)
__all__ = [
# Device types
'DEVICE_NULL',
'DEVICE_I2S',
'DEVICE_BUZZER',
'DEVICE_BOTH',
# Stream types
'STREAM_MUSIC',
'STREAM_NOTIFICATION',
'STREAM_ALARM',
# Functions
'init',
'play_wav',
'play_rtttl',
'stop',
'pause',
'resume',
'set_volume',
'get_volume',
'get_device_type',
'is_playing',
]
@@ -0,0 +1,330 @@
# AudioFlinger - Core Audio Management Service
# Centralized audio routing with priority-based audio focus (Android-inspired)
# Supports I2S (digital audio) and PWM buzzer (tones/ringtones)
# Device type constants
DEVICE_NULL = 0 # No audio hardware (desktop fallback)
DEVICE_I2S = 1 # Digital audio output (WAV playback)
DEVICE_BUZZER = 2 # PWM buzzer (tones/RTTTL)
DEVICE_BOTH = 3 # Both I2S and buzzer available
# Stream type constants (priority order: higher number = higher priority)
STREAM_MUSIC = 0 # Background music (lowest priority)
STREAM_NOTIFICATION = 1 # Notification sounds (medium priority)
STREAM_ALARM = 2 # Alarms/alerts (highest priority)
# Module-level state (singleton pattern, follows battery_voltage.py)
_device_type = DEVICE_NULL
_i2s_pins = None # I2S pin configuration dict (created per-stream)
_buzzer_instance = None # PWM buzzer instance
_current_stream = None # Currently playing stream
_volume = 70 # System volume (0-100)
_stream_lock = None # Thread lock for stream management
def init(device_type, i2s_pins=None, buzzer_instance=None):
"""
Initialize AudioFlinger with hardware configuration.
Args:
device_type: One of DEVICE_NULL, DEVICE_I2S, DEVICE_BUZZER, DEVICE_BOTH
i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers (for I2S devices)
buzzer_instance: PWM instance for buzzer (for buzzer devices)
"""
global _device_type, _i2s_pins, _buzzer_instance, _stream_lock
_device_type = device_type
_i2s_pins = i2s_pins
_buzzer_instance = buzzer_instance
# Initialize thread lock for stream management
try:
import _thread
_stream_lock = _thread.allocate_lock()
except ImportError:
# Desktop mode - no threading support
_stream_lock = None
device_names = {
DEVICE_NULL: "NULL (no audio)",
DEVICE_I2S: "I2S (digital audio)",
DEVICE_BUZZER: "Buzzer (PWM tones)",
DEVICE_BOTH: "Both (I2S + Buzzer)"
}
print(f"AudioFlinger initialized: {device_names.get(device_type, 'Unknown')}")
def _check_audio_focus(stream_type):
"""
Check if a stream with the given type can start playback.
Implements priority-based audio focus (Android-inspired).
Args:
stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM)
Returns:
bool: True if stream can start, False if rejected
"""
global _current_stream
if not _current_stream:
return True # No stream playing, OK to start
if not _current_stream.is_playing():
return True # Current stream finished, OK to start
# Check priority
if stream_type <= _current_stream.stream_type:
print(f"AudioFlinger: Stream rejected (priority {stream_type} <= current {_current_stream.stream_type})")
return False
# Higher priority stream - interrupt current
print(f"AudioFlinger: Interrupting stream (priority {stream_type} > current {_current_stream.stream_type})")
_current_stream.stop()
return True
def _playback_thread(stream):
"""
Background thread function for audio playback.
Args:
stream: Stream instance (WAVStream or RTTTLStream)
"""
global _current_stream
# Acquire lock and set as current stream
if _stream_lock:
_stream_lock.acquire()
_current_stream = stream
if _stream_lock:
_stream_lock.release()
try:
# Run playback (blocks until complete or stopped)
stream.play()
except Exception as e:
print(f"AudioFlinger: Playback error: {e}")
finally:
# Clear current stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream == stream:
_current_stream = None
if _stream_lock:
_stream_lock.release()
def play_wav(file_path, stream_type=STREAM_MUSIC, volume=None, on_complete=None):
"""
Play WAV file via I2S.
Args:
file_path: Path to WAV file (e.g., "M:/sdcard/music/song.wav")
stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM)
volume: Override volume (0-100), or None to use system volume
on_complete: Callback function(message) called when playback finishes
Returns:
bool: True if playback started, False if rejected or unavailable
"""
if _device_type not in (DEVICE_I2S, DEVICE_BOTH):
print("AudioFlinger: play_wav() failed - no I2S device available")
return False
if not _i2s_pins:
print("AudioFlinger: play_wav() failed - I2S pins not configured")
return False
# Check audio focus
if _stream_lock:
_stream_lock.acquire()
can_start = _check_audio_focus(stream_type)
if _stream_lock:
_stream_lock.release()
if not can_start:
return False
# Create stream and start playback in background thread
try:
from mpos.audio.stream_wav import WAVStream
import _thread
import mpos.apps
stream = WAVStream(
file_path=file_path,
stream_type=stream_type,
volume=volume if volume is not None else _volume,
i2s_pins=_i2s_pins,
on_complete=on_complete
)
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(_playback_thread, (stream,))
return True
except Exception as e:
print(f"AudioFlinger: play_wav() failed: {e}")
return False
def play_rtttl(rtttl_string, stream_type=STREAM_NOTIFICATION, volume=None, on_complete=None):
"""
Play RTTTL ringtone via buzzer.
Args:
rtttl_string: RTTTL format string (e.g., "Nokia:d=4,o=5,b=225:8e6,8d6...")
stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM)
volume: Override volume (0-100), or None to use system volume
on_complete: Callback function(message) called when playback finishes
Returns:
bool: True if playback started, False if rejected or unavailable
"""
if _device_type not in (DEVICE_BUZZER, DEVICE_BOTH):
print("AudioFlinger: play_rtttl() failed - no buzzer device available")
return False
if not _buzzer_instance:
print("AudioFlinger: play_rtttl() failed - buzzer not initialized")
return False
# Check audio focus
if _stream_lock:
_stream_lock.acquire()
can_start = _check_audio_focus(stream_type)
if _stream_lock:
_stream_lock.release()
if not can_start:
return False
# Create stream and start playback in background thread
try:
from mpos.audio.stream_rtttl import RTTTLStream
import _thread
import mpos.apps
stream = RTTTLStream(
rtttl_string=rtttl_string,
stream_type=stream_type,
volume=volume if volume is not None else _volume,
buzzer_instance=_buzzer_instance,
on_complete=on_complete
)
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(_playback_thread, (stream,))
return True
except Exception as e:
print(f"AudioFlinger: play_rtttl() failed: {e}")
return False
def stop():
"""Stop current audio playback."""
global _current_stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream:
_current_stream.stop()
print("AudioFlinger: Playback stopped")
else:
print("AudioFlinger: No playback to stop")
if _stream_lock:
_stream_lock.release()
def pause():
"""
Pause current audio playback (if supported by stream).
Note: Most streams don't support pause, only stop.
"""
global _current_stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream and hasattr(_current_stream, 'pause'):
_current_stream.pause()
print("AudioFlinger: Playback paused")
else:
print("AudioFlinger: Pause not supported or no playback active")
if _stream_lock:
_stream_lock.release()
def resume():
"""
Resume paused audio playback (if supported by stream).
Note: Most streams don't support resume, only play.
"""
global _current_stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream and hasattr(_current_stream, 'resume'):
_current_stream.resume()
print("AudioFlinger: Playback resumed")
else:
print("AudioFlinger: Resume not supported or no playback active")
if _stream_lock:
_stream_lock.release()
def set_volume(volume):
"""
Set system volume (affects new streams, not current playback).
Args:
volume: Volume level (0-100)
"""
global _volume
_volume = max(0, min(100, volume))
def get_volume():
"""
Get system volume.
Returns:
int: Current system volume (0-100)
"""
return _volume
def get_device_type():
"""
Get configured audio device type.
Returns:
int: Device type (DEVICE_NULL, DEVICE_I2S, DEVICE_BUZZER, DEVICE_BOTH)
"""
return _device_type
def is_playing():
"""
Check if audio is currently playing.
Returns:
bool: True if playback active, False otherwise
"""
if _stream_lock:
_stream_lock.acquire()
result = _current_stream is not None and _current_stream.is_playing()
if _stream_lock:
_stream_lock.release()
return result
@@ -0,0 +1,231 @@
# RTTTLStream - RTTTL Ringtone Playback Stream for AudioFlinger
# Ring Tone Text Transfer Language parser and player
# Ported from Fri3d Camp 2024 Badge firmware
import math
import time
class RTTTLStream:
"""
RTTTL (Ring Tone Text Transfer Language) parser and player.
Format: "name:defaults:notes"
Example: "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#,8c#6,8b,d"
See: https://en.wikipedia.org/wiki/Ring_Tone_Text_Transfer_Language
"""
# Note frequency table (A-G, with sharps)
_NOTES = [
440.0, # A
493.9, # B or H
261.6, # C
293.7, # D
329.6, # E
349.2, # F
392.0, # G
0.0, # pad
466.2, # A#
0.0, # pad
277.2, # C#
311.1, # D#
0.0, # pad
370.0, # F#
415.3, # G#
0.0, # pad
]
def __init__(self, rtttl_string, stream_type, volume, buzzer_instance, on_complete):
"""
Initialize RTTTL stream.
Args:
rtttl_string: RTTTL format string (e.g., "Nokia:d=4,o=5,b=225:...")
stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM)
volume: Volume level (0-100)
buzzer_instance: PWM buzzer instance
on_complete: Callback function(message) when playback finishes
"""
self.stream_type = stream_type
self.volume = volume
self.buzzer = buzzer_instance
self.on_complete = on_complete
self._keep_running = True
self._is_playing = False
# Parse RTTTL format
tune_pieces = rtttl_string.split(':')
if len(tune_pieces) != 3:
raise ValueError('RTTTL should contain exactly 2 colons')
self.name = tune_pieces[0]
self.tune = tune_pieces[2]
self.tune_idx = 0
self._parse_defaults(tune_pieces[1])
def is_playing(self):
"""Check if stream is currently playing."""
return self._is_playing
def stop(self):
"""Stop playback."""
self._keep_running = False
def _parse_defaults(self, defaults):
"""
Parse default values from RTTTL format.
Example: "d=4,o=5,b=140"
"""
self.default_duration = 4
self.default_octave = 5
self.bpm = 120
for item in defaults.split(','):
setting = item.split('=')
if len(setting) != 2:
continue
key = setting[0].strip()
value = int(setting[1].strip())
if key == 'o':
self.default_octave = value
elif key == 'd':
self.default_duration = value
elif key == 'b':
self.bpm = value
# Calculate milliseconds per whole note
# 240000 = 60 sec/min * 4 beats/whole-note * 1000 msec/sec
self.msec_per_whole_note = 240000.0 / self.bpm
def _next_char(self):
"""Get next character from tune string."""
if self.tune_idx < len(self.tune):
char = self.tune[self.tune_idx]
self.tune_idx += 1
if char == ',':
char = ' '
return char
return '|' # End marker
def _notes(self):
"""
Generator that yields (frequency, duration_ms) tuples.
Yields:
tuple: (frequency_hz, duration_ms) for each note
"""
while True:
# Skip blank characters and commas
char = self._next_char()
while char == ' ':
char = self._next_char()
# Parse duration (if present)
# Duration of 1 = whole note, 8 = 1/8 note
duration = 0
while char.isdigit():
duration *= 10
duration += ord(char) - ord('0')
char = self._next_char()
if duration == 0:
duration = self.default_duration
if char == '|': # End of tune
return
# Parse note letter
note = char.lower()
if 'a' <= note <= 'g':
note_idx = ord(note) - ord('a')
elif note == 'h':
note_idx = 1 # H is equivalent to B
elif note == 'p':
note_idx = 7 # Pause
else:
note_idx = 7 # Unknown = pause
char = self._next_char()
# Check for sharp
if char == '#':
note_idx += 8
char = self._next_char()
# Check for duration modifier (dot) before octave
duration_multiplier = 1.0
if char == '.':
duration_multiplier = 1.5
char = self._next_char()
# Check for octave
if '4' <= char <= '7':
octave = ord(char) - ord('0')
char = self._next_char()
else:
octave = self.default_octave
# Check for duration modifier (dot) after octave
if char == '.':
duration_multiplier = 1.5
char = self._next_char()
# Calculate frequency and duration
freq = self._NOTES[note_idx] * (1 << (octave - 4))
msec = (self.msec_per_whole_note / duration) * duration_multiplier
yield freq, msec
def play(self):
"""Play RTTTL tune via buzzer (runs in background thread)."""
self._is_playing = True
# Calculate exponential duty cycle for perceptually linear volume
if self.volume <= 0:
duty = 0
else:
volume = min(100, self.volume)
# Exponential volume curve
# Maximum volume is at 50% duty cycle (32768 when using duty_u16)
# Minimum is 4 (absolute minimum for audible PWM)
divider = 10
duty = int(
((math.exp(volume / divider) - math.exp(0.1)) /
(math.exp(10) - math.exp(0.1)) * (32768 - 4)) + 4
)
print(f"RTTTLStream: Playing '{self.name}' (volume {self.volume}%)")
try:
for freq, msec in self._notes():
if not self._keep_running:
print("RTTTLStream: Playback stopped by user")
break
# Play tone
if freq > 0:
self.buzzer.freq(int(freq))
self.buzzer.duty_u16(duty)
# Play for 90% of duration, silent for 10% (note separation)
time.sleep_ms(int(msec * 0.9))
self.buzzer.duty_u16(0)
time.sleep_ms(int(msec * 0.1))
print(f"RTTTLStream: Finished playing '{self.name}'")
if self.on_complete:
self.on_complete(f"Finished: {self.name}")
except Exception as e:
print(f"RTTTLStream: Error: {e}")
if self.on_complete:
self.on_complete(f"Error: {e}")
finally:
# Ensure buzzer is off
self.buzzer.duty_u16(0)
self._is_playing = False
@@ -1,29 +1,83 @@
# WAVStream - WAV File Playback Stream for AudioFlinger
# Supports 8/16/24/32-bit PCM, mono+stereo, auto-upsampling, volume control
# Ported from MusicPlayer's AudioPlayer class
import machine
import os
import time
import micropython
import sys
# Volume scaling function - regular Python version
# Note: Viper optimization removed because @micropython.viper decorator
# causes cross-compiler errors on Unix/macOS builds even inside conditionals
def _scale_audio(buf, num_bytes, scale_fixed):
"""Volume scaling for 16-bit audio samples."""
for i in range(0, num_bytes, 2):
lo = buf[i]
hi = buf[i + 1]
sample = (hi << 8) | lo
if hi & 128:
sample -= 65536
sample = (sample * scale_fixed) // 32768
if sample > 32767:
sample = 32767
elif sample < -32768:
sample = -32768
buf[i] = sample & 255
buf[i + 1] = (sample >> 8) & 255
# ----------------------------------------------------------------------
# AudioPlayer robust, volume-controllable WAV player
# Supports 8 / 16 / 24 / 32-bit PCM, mono + stereo
# Auto-up-samples any rate < 22050 Hz to >=22050 Hz
# ----------------------------------------------------------------------
class AudioPlayer:
_i2s = None
_volume = 50 # 0-100
_keep_running = True
class WAVStream:
"""
WAV file playback stream with I2S output.
Supports 8/16/24/32-bit PCM, mono and stereo, auto-upsampling to >=22050 Hz.
"""
# ------------------------------------------------------------------
# WAV header parser returns bit-depth
# ------------------------------------------------------------------
def __init__(self, file_path, stream_type, volume, i2s_pins, on_complete):
"""
Initialize WAV stream.
Args:
file_path: Path to WAV file
stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM)
volume: Volume level (0-100)
i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers
on_complete: Callback function(message) when playback finishes
"""
self.file_path = file_path
self.stream_type = stream_type
self.volume = volume
self.i2s_pins = i2s_pins
self.on_complete = on_complete
self._keep_running = True
self._is_playing = False
self._i2s = None
def is_playing(self):
"""Check if stream is currently playing."""
return self._is_playing
def stop(self):
"""Stop playback."""
self._keep_running = False
# ----------------------------------------------------------------------
# WAV header parser - returns bit-depth and format info
# ----------------------------------------------------------------------
@staticmethod
def find_data_chunk(f):
"""Return (data_start, data_size, sample_rate, channels, bits_per_sample)"""
def _find_data_chunk(f):
"""
Parse WAV header and find data chunk.
Returns:
tuple: (data_start, data_size, sample_rate, channels, bits_per_sample)
"""
f.seek(0)
if f.read(4) != b'RIFF':
raise ValueError("Not a RIFF (standard .wav) file")
file_size = int.from_bytes(f.read(4), 'little') + 8
if f.read(4) != b'WAVE':
raise ValueError("Not a WAVE (standard .wav) file")
@@ -31,87 +85,61 @@ class AudioPlayer:
sample_rate = None
channels = None
bits_per_sample = None
while pos < file_size:
f.seek(pos)
chunk_id = f.read(4)
if len(chunk_id) < 4:
break
chunk_size = int.from_bytes(f.read(4), 'little')
if chunk_id == b'fmt ':
fmt = f.read(chunk_size)
if len(fmt) < 16:
raise ValueError("Invalid fmt chunk")
if int.from_bytes(fmt[0:2], 'little') != 1:
raise ValueError("Only PCM supported")
channels = int.from_bytes(fmt[2:4], 'little')
if channels not in (1, 2):
raise ValueError("Only mono or stereo supported")
sample_rate = int.from_bytes(fmt[4:8], 'little')
bits_per_sample = int.from_bytes(fmt[14:16], 'little')
if bits_per_sample not in (8, 16, 24, 32):
raise ValueError("Only 8/16/24/32-bit PCM supported")
elif chunk_id == b'data':
return f.tell(), chunk_size, sample_rate, channels, bits_per_sample
pos += 8 + chunk_size
if chunk_size % 2:
pos += 1
raise ValueError("No 'data' chunk found")
# ------------------------------------------------------------------
# Volume control
# ------------------------------------------------------------------
@classmethod
def set_volume(cls, volume: int):
volume = max(0, min(100, volume))
cls._volume = volume
@classmethod
def get_volume(cls) -> int:
return cls._volume
@classmethod
def stop_playing(cls):
print("stop_playing()")
cls._keep_running = False
# ------------------------------------------------------------------
# 1. Up-sample 16-bit buffer (zero-order-hold)
# ------------------------------------------------------------------
# ----------------------------------------------------------------------
# Bit depth conversion functions
# ----------------------------------------------------------------------
@staticmethod
def _upsample_buffer(raw: bytearray, factor: int) -> bytearray:
if factor == 1:
return raw
upsampled = bytearray(len(raw) * factor)
out_idx = 0
for i in range(0, len(raw), 2):
lo = raw[i]
hi = raw[i + 1]
for _ in range(factor):
upsampled[out_idx] = lo
upsampled[out_idx + 1] = hi
out_idx += 2
return upsampled
# ------------------------------------------------------------------
# 2. Convert 8-bit to 16-bit (non-viper, Viper-safe)
# ------------------------------------------------------------------
@staticmethod
def _convert_8_to_16(buf: bytearray) -> bytearray:
def _convert_8_to_16(buf):
"""Convert 8-bit unsigned PCM to 16-bit signed PCM."""
out = bytearray(len(buf) * 2)
j = 0
for i in range(len(buf)):
u8 = buf[i]
s16 = (u8 - 128) << 8
out[j] = s16 & 0xFF
out[j] = s16 & 0xFF
out[j + 1] = (s16 >> 8) & 0xFF
j += 2
return out
# ------------------------------------------------------------------
# 3. Convert 24-bit to 16-bit (non-viper)
# ------------------------------------------------------------------
@staticmethod
def _convert_24_to_16(buf: bytearray) -> bytearray:
def _convert_24_to_16(buf):
"""Convert 24-bit PCM to 16-bit PCM."""
samples = len(buf) // 3
out = bytearray(samples * 2)
j = 0
@@ -123,16 +151,14 @@ class AudioPlayer:
if b2 & 0x80:
s24 -= 0x1000000
s16 = s24 >> 8
out[i * 2] = s16 & 0xFF
out[i * 2] = s16 & 0xFF
out[i * 2 + 1] = (s16 >> 8) & 0xFF
j += 3
return out
# ------------------------------------------------------------------
# 4. Convert 32-bit to 16-bit (non-viper)
# ------------------------------------------------------------------
@staticmethod
def _convert_32_to_16(buf: bytearray) -> bytearray:
def _convert_32_to_16(buf):
"""Convert 32-bit PCM to 16-bit PCM."""
samples = len(buf) // 4
out = bytearray(samples * 2)
j = 0
@@ -145,28 +171,49 @@ class AudioPlayer:
if b3 & 0x80:
s32 -= 0x100000000
s16 = s32 >> 16
out[i * 2] = s16 & 0xFF
out[i * 2] = s16 & 0xFF
out[i * 2 + 1] = (s16 >> 8) & 0xFF
j += 4
return out
# ------------------------------------------------------------------
# ----------------------------------------------------------------------
# Upsampling (zero-order-hold)
# ----------------------------------------------------------------------
@staticmethod
def _upsample_buffer(raw, factor):
"""Upsample 16-bit buffer by repeating samples."""
if factor == 1:
return raw
upsampled = bytearray(len(raw) * factor)
out_idx = 0
for i in range(0, len(raw), 2):
lo = raw[i]
hi = raw[i + 1]
for _ in range(factor):
upsampled[out_idx] = lo
upsampled[out_idx + 1] = hi
out_idx += 2
return upsampled
# ----------------------------------------------------------------------
# Main playback routine
# ------------------------------------------------------------------
@classmethod
def play_wav(cls, filename, result_callback=None):
cls._keep_running = True
# ----------------------------------------------------------------------
def play(self):
"""Main playback routine (runs in background thread)."""
self._is_playing = True
try:
with open(filename, 'rb') as f:
st = os.stat(filename)
with open(self.file_path, 'rb') as f:
st = os.stat(self.file_path)
file_size = st[6]
print(f"File size: {file_size} bytes")
print(f"WAVStream: Playing {self.file_path} ({file_size} bytes)")
# ----- parse header ------------------------------------------------
# Parse WAV header
data_start, data_size, original_rate, channels, bits_per_sample = \
cls.find_data_chunk(f)
self._find_data_chunk(f)
# ----- decide playback rate (force >=22050 Hz) --------------------
# Decide playback rate (force >=22050 Hz)
target_rate = 22050
if original_rate >= target_rate:
playback_rate = original_rate
@@ -175,20 +222,20 @@ class AudioPlayer:
upsample_factor = (target_rate + original_rate - 1) // original_rate
playback_rate = original_rate * upsample_factor
print(f"Original: {original_rate} Hz, {bits_per_sample}-bit, {channels}-ch "
f"to Playback: {playback_rate} Hz (factor {upsample_factor})")
print(f"WAVStream: {original_rate} Hz, {bits_per_sample}-bit, {channels}-ch")
print(f"WAVStream: Playback at {playback_rate} Hz (factor {upsample_factor})")
if data_size > file_size - data_start:
data_size = file_size - data_start
# ----- I2S init (always 16-bit) ----------------------------------
# Initialize I2S (always 16-bit output)
try:
i2s_format = machine.I2S.MONO if channels == 1 else machine.I2S.STEREO
cls._i2s = machine.I2S(
self._i2s = machine.I2S(
0,
sck=machine.Pin(2, machine.Pin.OUT),
ws =machine.Pin(47, machine.Pin.OUT),
sd =machine.Pin(16, machine.Pin.OUT),
sck=machine.Pin(self.i2s_pins['sck'], machine.Pin.OUT),
ws=machine.Pin(self.i2s_pins['ws'], machine.Pin.OUT),
sd=machine.Pin(self.i2s_pins['sd'], machine.Pin.OUT),
mode=machine.I2S.TX,
bits=16,
format=i2s_format,
@@ -196,38 +243,22 @@ class AudioPlayer:
ibuf=32000
)
except Exception as e:
print(f"Warning: simulating playback (I2S init failed): {e}")
print(f"WAVStream: I2S init failed: {e}")
return
print(f"Playing {data_size} original bytes (vol {cls._volume}%) ...")
print(f"WAVStream: Playing {data_size} bytes (volume {self.volume}%)")
f.seek(data_start)
# ----- Viper volume scaler (16-bit only) -------------------------
@micropython.viper # throws "invalid micropython decorator" on macOS / darwin
def scale_audio(buf: ptr8, num_bytes: int, scale_fixed: int):
for i in range(0, num_bytes, 2):
lo = int(buf[i])
hi = int(buf[i+1])
sample = (hi << 8) | lo
if hi & 128:
sample -= 65536
sample = (sample * scale_fixed) // 32768
if sample > 32767:
sample = 32767
elif sample < -32768:
sample = -32768
buf[i] = sample & 255
buf[i+1] = (sample >> 8) & 255
chunk_size = 4096
bytes_per_original_sample = (bits_per_sample // 8) * channels
total_original = 0
while total_original < data_size:
if not cls._keep_running:
print("Playback stopped by user.")
if not self._keep_running:
print("WAVStream: Playback stopped by user")
break
# ---- read a whole-sample chunk of original data -------------
# Read chunk of original data
to_read = min(chunk_size, data_size - total_original)
to_read -= (to_read % bytes_per_original_sample)
if to_read <= 0:
@@ -237,44 +268,46 @@ class AudioPlayer:
if not raw:
break
# ---- 1. Convert bit-depth to 16-bit (non-viper) -------------
# 1. Convert bit-depth to 16-bit
if bits_per_sample == 8:
raw = cls._convert_8_to_16(raw)
raw = self._convert_8_to_16(raw)
elif bits_per_sample == 24:
raw = cls._convert_24_to_16(raw)
raw = self._convert_24_to_16(raw)
elif bits_per_sample == 32:
raw = cls._convert_32_to_16(raw)
# 16-bit to unchanged
raw = self._convert_32_to_16(raw)
# 16-bit unchanged
# ---- 2. Up-sample if needed ---------------------------------
# 2. Upsample if needed
if upsample_factor > 1:
raw = cls._upsample_buffer(raw, upsample_factor)
raw = self._upsample_buffer(raw, upsample_factor)
# ---- 3. Volume scaling --------------------------------------
scale = cls._volume / 100.0
# 3. Volume scaling
scale = self.volume / 100.0
if scale < 1.0:
scale_fixed = int(scale * 32768)
scale_audio(raw, len(raw), scale_fixed)
_scale_audio(raw, len(raw), scale_fixed)
# ---- 4. Output ---------------------------------------------
if cls._i2s:
cls._i2s.write(raw)
# 4. Output to I2S
if self._i2s:
self._i2s.write(raw)
else:
# Simulate playback timing if no I2S
num_samples = len(raw) // (2 * channels)
time.sleep(num_samples / playback_rate)
total_original += to_read
print(f"Finished playing {filename}")
if result_callback:
result_callback(f"Finished playing {filename}")
print(f"WAVStream: Finished playing {self.file_path}")
if self.on_complete:
self.on_complete(f"Finished: {self.file_path}")
except Exception as e:
print(f"Error: {e}\nwhile playing {filename}")
if result_callback:
result_callback(f"Error: {e}\nwhile playing {filename}")
print(f"WAVStream: Error: {e}")
if self.on_complete:
self.on_complete(f"Error: {e}")
finally:
if cls._i2s:
cls._i2s.deinit()
cls._i2s = None
self._is_playing = False
if self._i2s:
self._i2s.deinit()
self._i2s = None
@@ -289,4 +289,33 @@ mpos.battery_voltage.init_adc(13, adc_to_voltage)
import mpos.sdcard
mpos.sdcard.init(spi_bus, cs_pin=14)
# === AUDIO HARDWARE ===
from machine import PWM, Pin
import mpos.audio.audioflinger as AudioFlinger
# Initialize buzzer (GPIO 46)
buzzer = PWM(Pin(46), freq=550, duty=0)
# I2S pin configuration (GPIO 2, 47, 16)
# Note: I2S is created per-stream, not at boot (only one instance can exist)
i2s_pins = {
'sck': 2,
'ws': 47,
'sd': 16,
}
# Initialize AudioFlinger (both I2S and buzzer available)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BOTH,
i2s_pins=i2s_pins,
buzzer_instance=buzzer
)
# === LED HARDWARE ===
import mpos.lights as LightsManager
# Initialize 5 NeoPixel LEDs (GPIO 12)
LightsManager.init(neopixel_pin=12, num_leds=5)
print("Fri3d hardware: Audio and LEDs initialized")
print("boot.py finished")
@@ -95,6 +95,21 @@ def adc_to_voltage(adc_value):
mpos.battery_voltage.init_adc(999, adc_to_voltage)
# === AUDIO HARDWARE ===
import mpos.audio.audioflinger as AudioFlinger
# Note: Desktop builds have no audio hardware
# AudioFlinger functions will return False (no-op)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
# === LED HARDWARE ===
# Note: Desktop builds have no LED hardware
# LightsManager will not be initialized (functions will return False)
print("linux.py finished")
@@ -110,4 +110,20 @@ try:
except Exception as e:
print(f"Warning: powering off camera got exception: {e}")
# === AUDIO HARDWARE ===
import mpos.audio.audioflinger as AudioFlinger
# Note: Waveshare board has no buzzer or LEDs, only I2S audio
# I2S pin configuration will be determined by the board's audio hardware
# For now, initialize with I2S only (pins will be configured per-stream if available)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins={'sck': 2, 'ws': 47, 'sd': 16}, # Default ESP32-S3 I2S pins
buzzer_instance=None
)
# === LED HARDWARE ===
# Note: Waveshare board has no NeoPixel LEDs
# LightsManager will not be initialized (functions will return False)
print("boot.py finished")
@@ -0,0 +1,8 @@
# Fri3d Camp 2024 Badge Hardware Drivers
# These are simple wrappers that can be used by services like AudioFlinger
from .buzzer import BuzzerConfig
from .leds import LEDConfig
from .rtttl_data import RTTTL_SONGS
__all__ = ['BuzzerConfig', 'LEDConfig', 'RTTTL_SONGS']
@@ -0,0 +1,11 @@
# Fri3d Camp 2024 Badge - Buzzer Configuration
class BuzzerConfig:
"""Configuration for PWM buzzer hardware."""
# GPIO pin for buzzer
PIN = 46
# Default PWM settings
DEFAULT_FREQ = 550 # Hz
DEFAULT_DUTY = 0 # Off by default
@@ -0,0 +1,10 @@
# Fri3d Camp 2024 Badge - LED Configuration
class LEDConfig:
"""Configuration for NeoPixel RGB LED hardware."""
# GPIO pin for NeoPixel data line
PIN = 12
# Number of NeoPixel LEDs on badge
NUM_LEDS = 5
@@ -0,0 +1,18 @@
# RTTTL Song Catalog
# Ring Tone Text Transfer Language songs for buzzer playback
# Format: "name:defaults:notes"
# Ported from Fri3d Camp 2024 Badge firmware
RTTTL_SONGS = {
"nokia": "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#,8c#6,8b,d,8p,8b,8a,8c#,8e,8a,8p",
"macarena": "Macarena:d=4,o=5,b=180:f,8f,8f,f,8f,8f,8f,8f,8f,8f,8f,8a,c,8c,f,8f,8f,f,8f,8f,8f,8f,8f,8f,8d,8c,p,f,8f,8f,f,8f,8f,8f,8f,8f,8f,8f,8a,p,2c,f,8f,8f,f,8f,8f,8f,8f,8f,8f,8d,8c",
"takeonme": "TakeOnMe:d=4,o=4,b=160:8f#5,8f#5,8f#5,8d5,8p,8b,8p,8e5,8p,8e5,8p,8e5,8g#5,8g#5,8a5,8b5,8a5,8a5,8a5,8e5,8p,8d5,8p,8f#5,8p,8f#5,8p,8f#5,8e5,8e5,8f#5,8e5",
"goodbadugly": "TheGoodTheBad:d=4,o=5,b=160:c,8d,8e,8d,c,8d,8e,8d,c,8d,e,8f,2g,8p,a,b,c6,8b,8a,8g,8f,e,8f,g,8e,8d,8c",
"creeps": "Creeps:d=4,o=5,b=120:8c,8d,8e,8f,g,8e,8f,g,8f,8e,8d,c,8d,8e,f,8d,8e,f,8e,8d,8c,8b4",
"william_tell": "WilliamTell:d=4,o=5,b=125:8e,8e,8e,2p,8e,8e,8e,2p,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,e"
}
+153
View File
@@ -0,0 +1,153 @@
# LightsManager - Simple LED Control Service for MicroPythonOS
# Provides one-shot LED control for NeoPixel RGB LEDs
# Apps implement custom animations using the update_frame() pattern
# Module-level state (singleton pattern)
_neopixel = None
_num_leds = 0
def init(neopixel_pin, num_leds=5):
"""
Initialize NeoPixel LEDs.
Args:
neopixel_pin: GPIO pin number for NeoPixel data line
num_leds: Number of LEDs in the strip (default 5 for Fri3d badge)
"""
global _neopixel, _num_leds
try:
from machine import Pin
from neopixel import NeoPixel
_neopixel = NeoPixel(Pin(neopixel_pin, Pin.OUT), num_leds)
_num_leds = num_leds
# Clear all LEDs on initialization
for i in range(num_leds):
_neopixel[i] = (0, 0, 0)
_neopixel.write()
print(f"LightsManager initialized: {num_leds} LEDs on GPIO {neopixel_pin}")
except Exception as e:
print(f"LightsManager: Failed to initialize LEDs: {e}")
print(" - LED functions will return False (no-op)")
def is_available():
"""
Check if LED hardware is available.
Returns:
bool: True if LEDs are initialized and available
"""
return _neopixel is not None
def get_led_count():
"""
Get the number of LEDs.
Returns:
int: Number of LEDs, or 0 if not initialized
"""
return _num_leds
def set_led(index, r, g, b):
"""
Set a single LED color (buffered until write() is called).
Args:
index: LED index (0 to num_leds-1)
r: Red value (0-255)
g: Green value (0-255)
b: Blue value (0-255)
Returns:
bool: True if successful, False if LEDs unavailable or invalid index
"""
if not _neopixel:
return False
if index < 0 or index >= _num_leds:
print(f"LightsManager: Invalid LED index {index} (valid range: 0-{_num_leds-1})")
return False
_neopixel[index] = (r, g, b)
return True
def set_all(r, g, b):
"""
Set all LEDs to the same color (buffered until write() is called).
Args:
r: Red value (0-255)
g: Green value (0-255)
b: Blue value (0-255)
Returns:
bool: True if successful, False if LEDs unavailable
"""
if not _neopixel:
return False
for i in range(_num_leds):
_neopixel[i] = (r, g, b)
return True
def clear():
"""
Clear all LEDs (set to black, buffered until write() is called).
Returns:
bool: True if successful, False if LEDs unavailable
"""
return set_all(0, 0, 0)
def write():
"""
Update hardware with buffered LED colors.
Must be called after set_led(), set_all(), or clear() to make changes visible.
Returns:
bool: True if successful, False if LEDs unavailable
"""
if not _neopixel:
return False
_neopixel.write()
return True
def set_notification_color(color_name):
"""
Convenience method to set all LEDs to a common color and update immediately.
Args:
color_name: Color name (red, green, blue, yellow, orange, purple, white)
Returns:
bool: True if successful, False if LEDs unavailable or unknown color
"""
colors = {
"red": (255, 0, 0),
"green": (0, 255, 0),
"blue": (0, 0, 255),
"yellow": (255, 255, 0),
"orange": (255, 128, 0),
"purple": (128, 0, 255),
"white": (255, 255, 255),
}
color = colors.get(color_name.lower())
if not color:
print(f"LightsManager: Unknown color '{color_name}'")
print(f" - Available colors: {', '.join(colors.keys())}")
return False
return set_all(*color) and write()
+102
View File
@@ -0,0 +1,102 @@
# Hardware Mocks for Testing AudioFlinger and LightsManager
# Provides mock implementations of PWM, I2S, NeoPixel, and Pin classes
class MockPin:
"""Mock machine.Pin for testing."""
IN = 0
OUT = 1
PULL_UP = 2
def __init__(self, pin_number, mode=None, pull=None):
self.pin_number = pin_number
self.mode = mode
self.pull = pull
self._value = 0
def value(self, val=None):
if val is not None:
self._value = val
return self._value
class MockPWM:
"""Mock machine.PWM for testing buzzer."""
def __init__(self, pin, freq=0, duty=0):
self.pin = pin
self.last_freq = freq
self.last_duty = duty
self.freq_history = []
self.duty_history = []
def freq(self, value=None):
"""Set or get frequency."""
if value is not None:
self.last_freq = value
self.freq_history.append(value)
return self.last_freq
def duty_u16(self, value=None):
"""Set or get duty cycle (0-65535)."""
if value is not None:
self.last_duty = value
self.duty_history.append(value)
return self.last_duty
class MockI2S:
"""Mock machine.I2S for testing audio playback."""
TX = 0
MONO = 1
STEREO = 2
def __init__(self, id, sck, ws, sd, mode, bits, format, rate, ibuf):
self.id = id
self.sck = sck
self.ws = ws
self.sd = sd
self.mode = mode
self.bits = bits
self.format = format
self.rate = rate
self.ibuf = ibuf
self.written_bytes = []
self.total_bytes_written = 0
def write(self, buf):
"""Simulate writing to I2S hardware."""
self.written_bytes.append(bytes(buf))
self.total_bytes_written += len(buf)
return len(buf)
def deinit(self):
"""Deinitialize I2S."""
pass
class MockNeoPixel:
"""Mock neopixel.NeoPixel for testing LEDs."""
def __init__(self, pin, num_leds):
self.pin = pin
self.num_leds = num_leds
self.pixels = [(0, 0, 0)] * num_leds
self.write_count = 0
def __setitem__(self, index, value):
"""Set LED color (R, G, B) tuple."""
if 0 <= index < self.num_leds:
self.pixels[index] = value
def __getitem__(self, index):
"""Get LED color."""
if 0 <= index < self.num_leds:
return self.pixels[index]
return (0, 0, 0)
def write(self):
"""Update hardware (mock - just increment counter)."""
self.write_count += 1
+243
View File
@@ -0,0 +1,243 @@
# Unit tests for AudioFlinger service
import unittest
import sys
# Mock hardware before importing
class MockPWM:
def __init__(self, pin, freq=0, duty=0):
self.pin = pin
self.last_freq = freq
self.last_duty = duty
def freq(self, value=None):
if value is not None:
self.last_freq = value
return self.last_freq
def duty_u16(self, value=None):
if value is not None:
self.last_duty = value
return self.last_duty
class MockPin:
IN = 0
OUT = 1
def __init__(self, pin_number, mode=None):
self.pin_number = pin_number
self.mode = mode
# Inject mocks
class MockMachine:
PWM = MockPWM
Pin = MockPin
sys.modules['machine'] = MockMachine()
class MockLock:
def acquire(self):
pass
def release(self):
pass
class MockThread:
@staticmethod
def allocate_lock():
return MockLock()
@staticmethod
def start_new_thread(func, args, **kwargs):
pass # No-op for testing
@staticmethod
def stack_size(size=None):
return 16384 if size is None else None
sys.modules['_thread'] = MockThread()
class MockMposApps:
@staticmethod
def good_stack_size():
return 16384
sys.modules['mpos.apps'] = MockMposApps()
# Now import the module to test
import mpos.audio.audioflinger as AudioFlinger
class TestAudioFlinger(unittest.TestCase):
"""Test cases for AudioFlinger service."""
def setUp(self):
"""Initialize AudioFlinger before each test."""
self.buzzer = MockPWM(MockPin(46))
self.i2s_pins = {'sck': 2, 'ws': 47, 'sd': 16}
# Reset volume to default before each test
AudioFlinger.set_volume(70)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BOTH,
i2s_pins=self.i2s_pins,
buzzer_instance=self.buzzer
)
def tearDown(self):
"""Clean up after each test."""
AudioFlinger.stop()
def test_initialization(self):
"""Test that AudioFlinger initializes correctly."""
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BOTH)
self.assertEqual(AudioFlinger._i2s_pins, self.i2s_pins)
self.assertEqual(AudioFlinger._buzzer_instance, self.buzzer)
def test_device_types(self):
"""Test device type constants."""
self.assertEqual(AudioFlinger.DEVICE_NULL, 0)
self.assertEqual(AudioFlinger.DEVICE_I2S, 1)
self.assertEqual(AudioFlinger.DEVICE_BUZZER, 2)
self.assertEqual(AudioFlinger.DEVICE_BOTH, 3)
def test_stream_types(self):
"""Test stream type constants and priority order."""
self.assertEqual(AudioFlinger.STREAM_MUSIC, 0)
self.assertEqual(AudioFlinger.STREAM_NOTIFICATION, 1)
self.assertEqual(AudioFlinger.STREAM_ALARM, 2)
# Higher number = higher priority
self.assertTrue(AudioFlinger.STREAM_MUSIC < AudioFlinger.STREAM_NOTIFICATION)
self.assertTrue(AudioFlinger.STREAM_NOTIFICATION < AudioFlinger.STREAM_ALARM)
def test_volume_control(self):
"""Test volume get/set operations."""
# Set volume
AudioFlinger.set_volume(50)
self.assertEqual(AudioFlinger.get_volume(), 50)
# Test clamping to 0-100 range
AudioFlinger.set_volume(150)
self.assertEqual(AudioFlinger.get_volume(), 100)
AudioFlinger.set_volume(-10)
self.assertEqual(AudioFlinger.get_volume(), 0)
def test_device_null_rejects_playback(self):
"""Test that DEVICE_NULL rejects all playback requests."""
# Re-initialize with no device
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
# WAV should be rejected
result = AudioFlinger.play_wav("test.wav")
self.assertFalse(result)
# RTTTL should be rejected
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
def test_device_i2s_only_rejects_rtttl(self):
"""Test that DEVICE_I2S rejects buzzer playback."""
# Re-initialize with I2S only
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins=self.i2s_pins,
buzzer_instance=None
)
# RTTTL should be rejected (no buzzer)
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
def test_device_buzzer_only_rejects_wav(self):
"""Test that DEVICE_BUZZER rejects I2S playback."""
# Re-initialize with buzzer only
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BUZZER,
i2s_pins=None,
buzzer_instance=self.buzzer
)
# WAV should be rejected (no I2S)
result = AudioFlinger.play_wav("test.wav")
self.assertFalse(result)
def test_missing_i2s_pins_rejects_wav(self):
"""Test that missing I2S pins rejects WAV playback."""
# Re-initialize with I2S device but no pins
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins=None,
buzzer_instance=None
)
result = AudioFlinger.play_wav("test.wav")
self.assertFalse(result)
def test_is_playing_initially_false(self):
"""Test that is_playing() returns False initially."""
self.assertFalse(AudioFlinger.is_playing())
def test_stop_with_no_playback(self):
"""Test that stop() can be called when nothing is playing."""
# Should not raise exception
AudioFlinger.stop()
self.assertFalse(AudioFlinger.is_playing())
def test_get_device_type(self):
"""Test that get_device_type() returns correct value."""
# Test DEVICE_BOTH
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BOTH,
i2s_pins=self.i2s_pins,
buzzer_instance=self.buzzer
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BOTH)
# Test DEVICE_I2S
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins=self.i2s_pins,
buzzer_instance=None
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_I2S)
# Test DEVICE_BUZZER
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BUZZER,
i2s_pins=None,
buzzer_instance=self.buzzer
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BUZZER)
# Test DEVICE_NULL
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_NULL)
def test_audio_focus_check_no_current_stream(self):
"""Test audio focus allows playback when no stream is active."""
result = AudioFlinger._check_audio_focus(AudioFlinger.STREAM_MUSIC)
self.assertTrue(result)
def test_init_creates_lock(self):
"""Test that initialization creates a stream lock."""
self.assertIsNotNone(AudioFlinger._stream_lock)
def test_volume_default_value(self):
"""Test that default volume is reasonable."""
# After init, volume should be at default (70)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
self.assertEqual(AudioFlinger.get_volume(), 70)
+126
View File
@@ -0,0 +1,126 @@
# Unit tests for LightsManager service
import unittest
import sys
# Mock hardware before importing LightsManager
class MockPin:
IN = 0
OUT = 1
def __init__(self, pin_number, mode=None):
self.pin_number = pin_number
self.mode = mode
class MockNeoPixel:
def __init__(self, pin, num_leds):
self.pin = pin
self.num_leds = num_leds
self.pixels = [(0, 0, 0)] * num_leds
self.write_count = 0
def __setitem__(self, index, value):
if 0 <= index < self.num_leds:
self.pixels[index] = value
def __getitem__(self, index):
if 0 <= index < self.num_leds:
return self.pixels[index]
return (0, 0, 0)
def write(self):
self.write_count += 1
# Inject mocks
sys.modules['machine'] = type('module', (), {'Pin': MockPin})()
sys.modules['neopixel'] = type('module', (), {'NeoPixel': MockNeoPixel})()
# Now import the module to test
import mpos.lights as LightsManager
class TestLightsManager(unittest.TestCase):
"""Test cases for LightsManager service."""
def setUp(self):
"""Initialize LightsManager before each test."""
LightsManager.init(neopixel_pin=12, num_leds=5)
def test_initialization(self):
"""Test that LightsManager initializes correctly."""
self.assertTrue(LightsManager.is_available())
self.assertEqual(LightsManager.get_led_count(), 5)
def test_set_single_led(self):
"""Test setting a single LED color."""
result = LightsManager.set_led(0, 255, 0, 0)
self.assertTrue(result)
# Verify color was set (via internal _neopixel mock)
neopixel = LightsManager._neopixel
self.assertEqual(neopixel[0], (255, 0, 0))
def test_set_led_invalid_index(self):
"""Test that invalid LED indices are rejected."""
# Negative index
result = LightsManager.set_led(-1, 255, 0, 0)
self.assertFalse(result)
# Index too large
result = LightsManager.set_led(10, 255, 0, 0)
self.assertFalse(result)
def test_set_all_leds(self):
"""Test setting all LEDs to same color."""
result = LightsManager.set_all(0, 255, 0)
self.assertTrue(result)
# Verify all LEDs were set
neopixel = LightsManager._neopixel
for i in range(5):
self.assertEqual(neopixel[i], (0, 255, 0))
def test_clear(self):
"""Test clearing all LEDs."""
# First set some colors
LightsManager.set_all(255, 255, 255)
# Then clear
result = LightsManager.clear()
self.assertTrue(result)
# Verify all LEDs are black
neopixel = LightsManager._neopixel
for i in range(5):
self.assertEqual(neopixel[i], (0, 0, 0))
def test_write(self):
"""Test that write() updates hardware."""
neopixel = LightsManager._neopixel
initial_count = neopixel.write_count
result = LightsManager.write()
self.assertTrue(result)
# Verify write was called
self.assertEqual(neopixel.write_count, initial_count + 1)
def test_notification_colors(self):
"""Test convenience notification color method."""
# Valid colors
self.assertTrue(LightsManager.set_notification_color("red"))
self.assertTrue(LightsManager.set_notification_color("green"))
self.assertTrue(LightsManager.set_notification_color("blue"))
# Invalid color
result = LightsManager.set_notification_color("invalid_color")
self.assertFalse(result)
def test_case_insensitive_colors(self):
"""Test that color names are case-insensitive."""
self.assertTrue(LightsManager.set_notification_color("RED"))
self.assertTrue(LightsManager.set_notification_color("Green"))
self.assertTrue(LightsManager.set_notification_color("BLUE"))
+173
View File
@@ -0,0 +1,173 @@
# Unit tests for RTTTL parser (RTTTLStream)
import unittest
import sys
# Mock hardware before importing
class MockPWM:
def __init__(self, pin, freq=0, duty=0):
self.pin = pin
self.last_freq = freq
self.last_duty = duty
self.freq_history = []
self.duty_history = []
def freq(self, value=None):
if value is not None:
self.last_freq = value
self.freq_history.append(value)
return self.last_freq
def duty_u16(self, value=None):
if value is not None:
self.last_duty = value
self.duty_history.append(value)
return self.last_duty
# Inject mock
sys.modules['machine'] = type('module', (), {'PWM': MockPWM, 'Pin': lambda x: x})()
# Now import the module to test
from mpos.audio.stream_rtttl import RTTTLStream
class TestRTTTL(unittest.TestCase):
"""Test cases for RTTTL parser."""
def setUp(self):
"""Create a mock buzzer before each test."""
self.buzzer = MockPWM(46)
def test_parse_simple_rtttl(self):
"""Test parsing a simple RTTTL string."""
rtttl = "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#"
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
self.assertEqual(stream.name, "Nokia")
self.assertEqual(stream.default_duration, 4)
self.assertEqual(stream.default_octave, 5)
self.assertEqual(stream.bpm, 225)
def test_parse_defaults(self):
"""Test parsing default values."""
rtttl = "Test:d=8,o=6,b=180:c"
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
self.assertEqual(stream.default_duration, 8)
self.assertEqual(stream.default_octave, 6)
self.assertEqual(stream.bpm, 180)
# Check calculated msec_per_whole_note
# 240000 / 180 = 1333.33...
self.assertAlmostEqual(stream.msec_per_whole_note, 1333.33, places=1)
def test_invalid_rtttl_format(self):
"""Test that invalid RTTTL format raises ValueError."""
# Missing colons
with self.assertRaises(ValueError):
RTTTLStream("invalid", 0, 100, self.buzzer, None)
# Too many colons
with self.assertRaises(ValueError):
RTTTLStream("a:b:c:d", 0, 100, self.buzzer, None)
def test_note_parsing(self):
"""Test parsing individual notes."""
rtttl = "Test:d=4,o=5,b=120:c,d,e"
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
# Generate notes
notes = list(stream._notes())
# Should have 3 notes
self.assertEqual(len(notes), 3)
# Each note should be a tuple of (frequency, duration)
for freq, duration in notes:
self.assertTrue(freq > 0, "Frequency should be non-zero")
self.assertTrue(duration > 0, "Duration should be non-zero")
def test_sharp_notes(self):
"""Test parsing sharp notes."""
rtttl = "Test:d=4,o=5,b=120:c#,d#,f#"
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
notes = list(stream._notes())
self.assertEqual(len(notes), 3)
# Sharp notes should have different frequencies than natural notes
# (can't test exact values without knowing frequency table)
def test_pause_notes(self):
"""Test parsing pause notes."""
rtttl = "Test:d=4,o=5,b=120:c,p,e"
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
notes = list(stream._notes())
self.assertEqual(len(notes), 3)
# Pause (p) should have frequency 0
freq, duration = notes[1]
self.assertEqual(freq, 0.0)
def test_duration_modifiers(self):
"""Test note duration modifiers (dots)."""
rtttl = "Test:d=4,o=5,b=120:c,c."
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
notes = list(stream._notes())
self.assertEqual(len(notes), 2)
# Dotted note should be 1.5x longer
normal_duration = notes[0][1]
dotted_duration = notes[1][1]
self.assertAlmostEqual(dotted_duration / normal_duration, 1.5, places=1)
def test_octave_variations(self):
"""Test notes with different octaves."""
rtttl = "Test:d=4,o=5,b=120:c4,c5,c6,c7"
stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None)
notes = list(stream._notes())
self.assertEqual(len(notes), 4)
# Higher octaves should have higher frequencies
freqs = [freq for freq, dur in notes]
self.assertTrue(freqs[0] < freqs[1], "c4 should be lower than c5")
self.assertTrue(freqs[1] < freqs[2], "c5 should be lower than c6")
self.assertTrue(freqs[2] < freqs[3], "c6 should be lower than c7")
def test_volume_scaling(self):
"""Test volume to duty cycle conversion."""
# Test various volume levels
for volume in [0, 25, 50, 75, 100]:
stream = RTTTLStream("Test:d=4,o=5,b=120:c", 0, volume, self.buzzer, None)
# Volume 0 should result in duty 0
if volume == 0:
# Note: play() method calculates duty, not __init__
pass # Can't easily test without calling play()
else:
# Volume > 0 should result in duty > 0
# (duty calculation happens in play() method)
pass
def test_stream_type(self):
"""Test that stream type is stored correctly."""
stream = RTTTLStream("Test:d=4,o=5,b=120:c", 2, 100, self.buzzer, None)
self.assertEqual(stream.stream_type, 2)
def test_stop_flag(self):
"""Test that stop flag can be set."""
stream = RTTTLStream("Test:d=4,o=5,b=120:c", 0, 100, self.buzzer, None)
self.assertTrue(stream._keep_running)
stream.stop()
self.assertFalse(stream._keep_running)
def test_is_playing_flag(self):
"""Test playing flag is initially false."""
stream = RTTTLStream("Test:d=4,o=5,b=120:c", 0, 100, self.buzzer, None)
self.assertFalse(stream.is_playing())
+78
View File
@@ -0,0 +1,78 @@
import unittest
import sys
import os
class TestSysPathRestore(unittest.TestCase):
"""Test that sys.path is properly restored after execute_script"""
def test_syspath_restored_after_execute_script(self):
"""Test that sys.path is restored to original state after script execution"""
# Import here to ensure we're in the right context
import mpos.apps
# Capture original sys.path
original_path = sys.path[:]
original_length = len(sys.path)
# Create a test directory path that would be added
test_cwd = "apps/com.test.app/assets/"
# Verify the test path is not already in sys.path
self.assertFalse(test_cwd in original_path,
f"Test path {test_cwd} should not be in sys.path initially")
# Create a simple test script
test_script = '''
import sys
# Just a simple script that does nothing
x = 42
'''
# Call execute_script with cwd parameter
# Note: This will fail because there's no Activity to start,
# but that's fine - we're testing the sys.path restoration
result = mpos.apps.execute_script(
test_script,
is_file=False,
cwd=test_cwd,
classname="NonExistentClass"
)
# After execution, sys.path should be restored
current_path = sys.path
current_length = len(sys.path)
# Verify sys.path has been restored to original
self.assertEqual(current_length, original_length,
f"sys.path length should be restored. Original: {original_length}, Current: {current_length}")
# Verify the test directory is not in sys.path anymore
self.assertFalse(test_cwd in current_path,
f"Test path {test_cwd} should not be in sys.path after execution. sys.path={current_path}")
# Verify sys.path matches original
self.assertEqual(current_path, original_path,
f"sys.path should match original.\nOriginal: {original_path}\nCurrent: {current_path}")
def test_syspath_not_affected_when_no_cwd(self):
"""Test that sys.path is unchanged when cwd is None"""
import mpos.apps
# Capture original sys.path
original_path = sys.path[:]
test_script = '''
x = 42
'''
# Call without cwd parameter
result = mpos.apps.execute_script(
test_script,
is_file=False,
cwd=None,
classname="NonExistentClass"
)
# sys.path should be unchanged
self.assertEqual(sys.path, original_path,
"sys.path should be unchanged when cwd is None")