diff --git a/CLAUDE.md b/CLAUDE.md index 27d33b90..083bee20 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -643,6 +643,212 @@ def defocus_handler(self, obj): - `mpos.clipboard`: System clipboard access - `mpos.battery_voltage`: Battery level reading (ESP32 only) +## Audio System (AudioFlinger) + +MicroPythonOS provides a centralized audio service called **AudioFlinger** (Android-inspired) that manages audio playback across different hardware outputs. + +### Supported Audio Devices + +- **I2S**: Digital audio output for WAV file playback (Fri3d badge, Waveshare board) +- **Buzzer**: PWM-based tone/ringtone playback (Fri3d badge only) +- **Both**: Simultaneous I2S and buzzer support +- **Null**: No audio (desktop/Linux) + +### Basic Usage + +**Playing WAV files**: +```python +import mpos.audio.audioflinger as AudioFlinger + +# Play music file +success = AudioFlinger.play_wav( + "M:/sdcard/music/song.wav", + stream_type=AudioFlinger.STREAM_MUSIC, + volume=80, + on_complete=lambda msg: print(msg) +) + +if not success: + print("Audio playback rejected (higher priority stream active)") +``` + +**Playing RTTTL ringtones**: +```python +# Play notification sound via buzzer +rtttl = "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#,8c#6,8b,d,8p,8b,8a,8c#,8e" +AudioFlinger.play_rtttl( + rtttl, + stream_type=AudioFlinger.STREAM_NOTIFICATION +) +``` + +**Volume control**: +```python +AudioFlinger.set_volume(70) # 0-100 +volume = AudioFlinger.get_volume() +``` + +**Stopping playback**: +```python +AudioFlinger.stop() +``` + +### Audio Focus Priority + +AudioFlinger implements priority-based audio focus (Android-inspired): +- **STREAM_ALARM** (priority 2): Highest priority +- **STREAM_NOTIFICATION** (priority 1): Medium priority +- **STREAM_MUSIC** (priority 0): Lowest priority + +Higher priority streams automatically interrupt lower priority streams. Equal or lower priority streams are rejected while a stream is playing. + +### Hardware Support Matrix + +| Board | I2S | Buzzer | LEDs | +|-------|-----|--------|------| +| Fri3d 2024 Badge | ✓ (GPIO 2, 47, 16) | ✓ (GPIO 46) | ✓ (5 RGB, GPIO 12) | +| Waveshare ESP32-S3 | ✓ (GPIO 2, 47, 16) | ✗ | ✗ | +| Linux/macOS | ✗ | ✗ | ✗ | + +### Configuration + +Audio device preference is configured in Settings app under "Advanced Settings": +- **Auto-detect**: Use available hardware (default) +- **I2S (Digital Audio)**: Digital audio only +- **Buzzer (PWM Tones)**: Tones/ringtones only +- **Both I2S and Buzzer**: Use both devices +- **Disabled**: No audio + +**Note**: Changing the audio device requires a restart to take effect. + +### Implementation Details + +- **Location**: `lib/mpos/audio/audioflinger.py` +- **Pattern**: Module-level singleton (similar to `battery_voltage.py`) +- **Thread-safe**: Uses locks for concurrent access +- **Background playback**: Runs in separate thread +- **WAV support**: 8/16/24/32-bit PCM, mono/stereo, auto-upsampling to ≥22050 Hz +- **RTTTL parser**: Full Ring Tone Text Transfer Language support with exponential volume curve + +## LED Control (LightsManager) + +MicroPythonOS provides a simple LED control service for NeoPixel RGB LEDs (Fri3d badge only). + +### Basic Usage + +**Check availability**: +```python +import mpos.lights as LightsManager + +if LightsManager.is_available(): + print(f"LEDs available: {LightsManager.get_led_count()}") +``` + +**Control individual LEDs**: +```python +# Set LED 0 to red (buffered) +LightsManager.set_led(0, 255, 0, 0) + +# Set LED 1 to green +LightsManager.set_led(1, 0, 255, 0) + +# Update hardware +LightsManager.write() +``` + +**Control all LEDs**: +```python +# Set all LEDs to blue +LightsManager.set_all(0, 0, 255) +LightsManager.write() + +# Clear all LEDs (black) +LightsManager.clear() +LightsManager.write() +``` + +**Notification colors**: +```python +# Convenience method for common colors +LightsManager.set_notification_color("red") +LightsManager.set_notification_color("green") +# Available: red, green, blue, yellow, orange, purple, white +``` + +### Custom Animations + +LightsManager provides one-shot control only (no built-in animations). Apps implement custom animations using the `update_frame()` pattern: + +```python +import time +import mpos.lights as LightsManager + +def blink_pattern(): + for _ in range(5): + LightsManager.set_all(255, 0, 0) + LightsManager.write() + time.sleep_ms(200) + + LightsManager.clear() + LightsManager.write() + time.sleep_ms(200) + +def rainbow_cycle(): + colors = [ + (255, 0, 0), # Red + (255, 128, 0), # Orange + (255, 255, 0), # Yellow + (0, 255, 0), # Green + (0, 0, 255), # Blue + ] + + for i, color in enumerate(colors): + LightsManager.set_led(i, *color) + + LightsManager.write() +``` + +**For frame-based LED animations**, use the TaskHandler event system: + +```python +import mpos.ui +import time + +class LEDAnimationActivity(Activity): + last_time = 0 + led_index = 0 + + def onResume(self, screen): + self.last_time = time.ticks_ms() + mpos.ui.task_handler.add_event_cb(self.update_frame, 1) + + def onPause(self, screen): + mpos.ui.task_handler.remove_event_cb(self.update_frame) + LightsManager.clear() + LightsManager.write() + + def update_frame(self, a, b): + current_time = time.ticks_ms() + delta_time = time.ticks_diff(current_time, self.last_time) / 1000.0 + self.last_time = current_time + + # Update animation every 0.5 seconds + if delta_time > 0.5: + LightsManager.clear() + LightsManager.set_led(self.led_index, 0, 255, 0) + LightsManager.write() + self.led_index = (self.led_index + 1) % LightsManager.get_led_count() +``` + +### Implementation Details + +- **Location**: `lib/mpos/lights.py` +- **Pattern**: Module-level singleton (similar to `battery_voltage.py`) +- **Hardware**: 5 NeoPixel RGB LEDs on GPIO 12 (Fri3d badge) +- **Buffered**: LED colors are buffered until `write()` is called +- **Thread-safe**: No locking (single-threaded usage recommended) +- **Desktop**: Functions return `False` (no-op) on desktop builds + ## Animations and Game Loops MicroPythonOS supports frame-based animations and game loops using the TaskHandler event system. This pattern is used for games, particle effects, and smooth animations. diff --git a/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py b/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py index 75ba010d..14380937 100644 --- a/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py +++ b/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py @@ -1,13 +1,11 @@ import machine import os -import _thread import time from mpos.apps import Activity, Intent import mpos.sdcard import mpos.ui - -from audio_player import AudioPlayer +import mpos.audio.audioflinger as AudioFlinger class MusicPlayer(Activity): @@ -68,17 +66,17 @@ class FullscreenPlayer(Activity): self._filename = self.getIntent().extras.get("filename") qr_screen = lv.obj() self._slider_label=lv.label(qr_screen) - self._slider_label.set_text(f"Volume: {AudioPlayer.get_volume()}%") + self._slider_label.set_text(f"Volume: {AudioFlinger.get_volume()}%") self._slider_label.align(lv.ALIGN.TOP_MID,0,lv.pct(4)) self._slider=lv.slider(qr_screen) self._slider.set_range(0,100) - self._slider.set_value(AudioPlayer.get_volume(), False) + self._slider.set_value(AudioFlinger.get_volume(), False) self._slider.set_width(lv.pct(90)) self._slider.align_to(self._slider_label,lv.ALIGN.OUT_BOTTOM_MID,0,10) def volume_slider_changed(e): volume_int = self._slider.get_value() self._slider_label.set_text(f"Volume: {volume_int}%") - AudioPlayer.set_volume(volume_int) + AudioFlinger.set_volume(volume_int) self._slider.add_event_cb(volume_slider_changed,lv.EVENT.VALUE_CHANGED,None) self._filename_label = lv.label(qr_screen) self._filename_label.align(lv.ALIGN.CENTER,0,0) @@ -104,11 +102,23 @@ class FullscreenPlayer(Activity): if not self._filename: print("Not playing any file...") else: - print("Starting thread to play file {self._filename}") - AudioPlayer.stop_playing() + print(f"Playing file {self._filename}") + AudioFlinger.stop() time.sleep(0.1) - _thread.stack_size(mpos.apps.good_stack_size()) - _thread.start_new_thread(AudioPlayer.play_wav, (self._filename,self.player_finished,)) + + success = AudioFlinger.play_wav( + self._filename, + stream_type=AudioFlinger.STREAM_MUSIC, + on_complete=self.player_finished + ) + + if not success: + error_msg = "Error: Audio device unavailable or busy" + print(error_msg) + self.update_ui_threadsafe_if_foreground( + self._filename_label.set_text, + error_msg + ) def focus_obj(self, obj): obj.set_style_border_color(lv.theme_get_color_primary(None),lv.PART.MAIN) @@ -118,7 +128,7 @@ class FullscreenPlayer(Activity): obj.set_style_border_width(0, lv.PART.MAIN) def stop_button_clicked(self, event): - AudioPlayer.stop_playing() + AudioFlinger.stop() self.finish() def player_finished(self, result=None): diff --git a/internal_filesystem/builtin/apps/com.micropythonos.settings/assets/settings.py b/internal_filesystem/builtin/apps/com.micropythonos.settings/assets/settings.py index 51262e74..56331915 100644 --- a/internal_filesystem/builtin/apps/com.micropythonos.settings/assets/settings.py +++ b/internal_filesystem/builtin/apps/com.micropythonos.settings/assets/settings.py @@ -43,6 +43,7 @@ class SettingsActivity(Activity): {"title": "Theme Color", "key": "theme_primary_color", "value_label": None, "cont": None, "placeholder": "HTML hex color, like: EC048C", "ui": "dropdown", "ui_options": theme_colors}, {"title": "Timezone", "key": "timezone", "value_label": None, "cont": None, "ui": "dropdown", "ui_options": self.get_timezone_tuples(), "changed_callback": lambda : mpos.time.refresh_timezone_preference()}, # Advanced settings, alphabetically: + {"title": "Audio Output Device", "key": "audio_device", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [("Auto-detect", "auto"), ("I2S (Digital Audio)", "i2s"), ("Buzzer (PWM Tones)", "buzzer"), ("Both I2S and Buzzer", "both"), ("Disabled", "null")], "changed_callback": self.audio_device_changed}, {"title": "Auto Start App", "key": "auto_start_app", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [(app.name, app.fullname) for app in PackageManager.get_app_list()]}, {"title": "Restart to Bootloader", "key": "boot_mode", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [("Normal", "normal"), ("Bootloader", "bootloader")]}, # special that doesn't get saved {"title": "Format internal data partition", "key": "format_internal_data_partition", "value_label": None, "cont": None, "ui": "radiobuttons", "ui_options": [("No, do not format", "no"), ("Yes, erase all settings, files and non-builtin apps", "yes")]}, # special that doesn't get saved @@ -111,6 +112,34 @@ class SettingsActivity(Activity): def get_timezone_tuples(): return [(tz, tz) for tz in mpos.time.get_timezones()] + def audio_device_changed(self): + """ + Called when audio device setting changes. + Note: Changing device type at runtime requires a restart for full effect. + AudioFlinger initialization happens at boot. + """ + import mpos.audio.audioflinger as AudioFlinger + + new_value = self.prefs.get_string("audio_device", "auto") + print(f"Audio device setting changed to: {new_value}") + print("Note: Restart required for audio device change to take effect") + + # Map setting values to device types + device_map = { + "auto": AudioFlinger.get_device_type(), # Keep current + "i2s": AudioFlinger.DEVICE_I2S, + "buzzer": AudioFlinger.DEVICE_BUZZER, + "both": AudioFlinger.DEVICE_BOTH, + "null": AudioFlinger.DEVICE_NULL, + } + + desired_device = device_map.get(new_value, AudioFlinger.get_device_type()) + current_device = AudioFlinger.get_device_type() + + if desired_device != current_device: + print(f"Desired device type ({desired_device}) differs from current ({current_device})") + print("Full device type change requires restart - current session continues with existing device") + def focus_container(self, container): print(f"container {container} focused, setting border...") container.set_style_border_color(lv.theme_get_color_primary(None),lv.PART.MAIN) diff --git a/internal_filesystem/lib/mpos/audio/__init__.py b/internal_filesystem/lib/mpos/audio/__init__.py new file mode 100644 index 00000000..86526aa9 --- /dev/null +++ b/internal_filesystem/lib/mpos/audio/__init__.py @@ -0,0 +1,55 @@ +# AudioFlinger - Centralized Audio Management Service for MicroPythonOS +# Android-inspired audio routing with priority-based audio focus + +from . import audioflinger + +# Re-export main API +from .audioflinger import ( + # Device types + DEVICE_NULL, + DEVICE_I2S, + DEVICE_BUZZER, + DEVICE_BOTH, + + # Stream types + STREAM_MUSIC, + STREAM_NOTIFICATION, + STREAM_ALARM, + + # Core functions + init, + play_wav, + play_rtttl, + stop, + pause, + resume, + set_volume, + get_volume, + get_device_type, + is_playing, +) + +__all__ = [ + # Device types + 'DEVICE_NULL', + 'DEVICE_I2S', + 'DEVICE_BUZZER', + 'DEVICE_BOTH', + + # Stream types + 'STREAM_MUSIC', + 'STREAM_NOTIFICATION', + 'STREAM_ALARM', + + # Functions + 'init', + 'play_wav', + 'play_rtttl', + 'stop', + 'pause', + 'resume', + 'set_volume', + 'get_volume', + 'get_device_type', + 'is_playing', +] diff --git a/internal_filesystem/lib/mpos/audio/audioflinger.py b/internal_filesystem/lib/mpos/audio/audioflinger.py new file mode 100644 index 00000000..47dfcd98 --- /dev/null +++ b/internal_filesystem/lib/mpos/audio/audioflinger.py @@ -0,0 +1,330 @@ +# AudioFlinger - Core Audio Management Service +# Centralized audio routing with priority-based audio focus (Android-inspired) +# Supports I2S (digital audio) and PWM buzzer (tones/ringtones) + +# Device type constants +DEVICE_NULL = 0 # No audio hardware (desktop fallback) +DEVICE_I2S = 1 # Digital audio output (WAV playback) +DEVICE_BUZZER = 2 # PWM buzzer (tones/RTTTL) +DEVICE_BOTH = 3 # Both I2S and buzzer available + +# Stream type constants (priority order: higher number = higher priority) +STREAM_MUSIC = 0 # Background music (lowest priority) +STREAM_NOTIFICATION = 1 # Notification sounds (medium priority) +STREAM_ALARM = 2 # Alarms/alerts (highest priority) + +# Module-level state (singleton pattern, follows battery_voltage.py) +_device_type = DEVICE_NULL +_i2s_pins = None # I2S pin configuration dict (created per-stream) +_buzzer_instance = None # PWM buzzer instance +_current_stream = None # Currently playing stream +_volume = 70 # System volume (0-100) +_stream_lock = None # Thread lock for stream management + + +def init(device_type, i2s_pins=None, buzzer_instance=None): + """ + Initialize AudioFlinger with hardware configuration. + + Args: + device_type: One of DEVICE_NULL, DEVICE_I2S, DEVICE_BUZZER, DEVICE_BOTH + i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers (for I2S devices) + buzzer_instance: PWM instance for buzzer (for buzzer devices) + """ + global _device_type, _i2s_pins, _buzzer_instance, _stream_lock + + _device_type = device_type + _i2s_pins = i2s_pins + _buzzer_instance = buzzer_instance + + # Initialize thread lock for stream management + try: + import _thread + _stream_lock = _thread.allocate_lock() + except ImportError: + # Desktop mode - no threading support + _stream_lock = None + + device_names = { + DEVICE_NULL: "NULL (no audio)", + DEVICE_I2S: "I2S (digital audio)", + DEVICE_BUZZER: "Buzzer (PWM tones)", + DEVICE_BOTH: "Both (I2S + Buzzer)" + } + + print(f"AudioFlinger initialized: {device_names.get(device_type, 'Unknown')}") + + +def _check_audio_focus(stream_type): + """ + Check if a stream with the given type can start playback. + Implements priority-based audio focus (Android-inspired). + + Args: + stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) + + Returns: + bool: True if stream can start, False if rejected + """ + global _current_stream + + if not _current_stream: + return True # No stream playing, OK to start + + if not _current_stream.is_playing(): + return True # Current stream finished, OK to start + + # Check priority + if stream_type <= _current_stream.stream_type: + print(f"AudioFlinger: Stream rejected (priority {stream_type} <= current {_current_stream.stream_type})") + return False + + # Higher priority stream - interrupt current + print(f"AudioFlinger: Interrupting stream (priority {stream_type} > current {_current_stream.stream_type})") + _current_stream.stop() + return True + + +def _playback_thread(stream): + """ + Background thread function for audio playback. + + Args: + stream: Stream instance (WAVStream or RTTTLStream) + """ + global _current_stream + + # Acquire lock and set as current stream + if _stream_lock: + _stream_lock.acquire() + _current_stream = stream + if _stream_lock: + _stream_lock.release() + + try: + # Run playback (blocks until complete or stopped) + stream.play() + except Exception as e: + print(f"AudioFlinger: Playback error: {e}") + finally: + # Clear current stream + if _stream_lock: + _stream_lock.acquire() + if _current_stream == stream: + _current_stream = None + if _stream_lock: + _stream_lock.release() + + +def play_wav(file_path, stream_type=STREAM_MUSIC, volume=None, on_complete=None): + """ + Play WAV file via I2S. + + Args: + file_path: Path to WAV file (e.g., "M:/sdcard/music/song.wav") + stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) + volume: Override volume (0-100), or None to use system volume + on_complete: Callback function(message) called when playback finishes + + Returns: + bool: True if playback started, False if rejected or unavailable + """ + if _device_type not in (DEVICE_I2S, DEVICE_BOTH): + print("AudioFlinger: play_wav() failed - no I2S device available") + return False + + if not _i2s_pins: + print("AudioFlinger: play_wav() failed - I2S pins not configured") + return False + + # Check audio focus + if _stream_lock: + _stream_lock.acquire() + can_start = _check_audio_focus(stream_type) + if _stream_lock: + _stream_lock.release() + + if not can_start: + return False + + # Create stream and start playback in background thread + try: + from mpos.audio.stream_wav import WAVStream + import _thread + import mpos.apps + + stream = WAVStream( + file_path=file_path, + stream_type=stream_type, + volume=volume if volume is not None else _volume, + i2s_pins=_i2s_pins, + on_complete=on_complete + ) + + _thread.stack_size(mpos.apps.good_stack_size()) + _thread.start_new_thread(_playback_thread, (stream,)) + return True + + except Exception as e: + print(f"AudioFlinger: play_wav() failed: {e}") + return False + + +def play_rtttl(rtttl_string, stream_type=STREAM_NOTIFICATION, volume=None, on_complete=None): + """ + Play RTTTL ringtone via buzzer. + + Args: + rtttl_string: RTTTL format string (e.g., "Nokia:d=4,o=5,b=225:8e6,8d6...") + stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) + volume: Override volume (0-100), or None to use system volume + on_complete: Callback function(message) called when playback finishes + + Returns: + bool: True if playback started, False if rejected or unavailable + """ + if _device_type not in (DEVICE_BUZZER, DEVICE_BOTH): + print("AudioFlinger: play_rtttl() failed - no buzzer device available") + return False + + if not _buzzer_instance: + print("AudioFlinger: play_rtttl() failed - buzzer not initialized") + return False + + # Check audio focus + if _stream_lock: + _stream_lock.acquire() + can_start = _check_audio_focus(stream_type) + if _stream_lock: + _stream_lock.release() + + if not can_start: + return False + + # Create stream and start playback in background thread + try: + from mpos.audio.stream_rtttl import RTTTLStream + import _thread + import mpos.apps + + stream = RTTTLStream( + rtttl_string=rtttl_string, + stream_type=stream_type, + volume=volume if volume is not None else _volume, + buzzer_instance=_buzzer_instance, + on_complete=on_complete + ) + + _thread.stack_size(mpos.apps.good_stack_size()) + _thread.start_new_thread(_playback_thread, (stream,)) + return True + + except Exception as e: + print(f"AudioFlinger: play_rtttl() failed: {e}") + return False + + +def stop(): + """Stop current audio playback.""" + global _current_stream + + if _stream_lock: + _stream_lock.acquire() + + if _current_stream: + _current_stream.stop() + print("AudioFlinger: Playback stopped") + else: + print("AudioFlinger: No playback to stop") + + if _stream_lock: + _stream_lock.release() + + +def pause(): + """ + Pause current audio playback (if supported by stream). + Note: Most streams don't support pause, only stop. + """ + global _current_stream + + if _stream_lock: + _stream_lock.acquire() + + if _current_stream and hasattr(_current_stream, 'pause'): + _current_stream.pause() + print("AudioFlinger: Playback paused") + else: + print("AudioFlinger: Pause not supported or no playback active") + + if _stream_lock: + _stream_lock.release() + + +def resume(): + """ + Resume paused audio playback (if supported by stream). + Note: Most streams don't support resume, only play. + """ + global _current_stream + + if _stream_lock: + _stream_lock.acquire() + + if _current_stream and hasattr(_current_stream, 'resume'): + _current_stream.resume() + print("AudioFlinger: Playback resumed") + else: + print("AudioFlinger: Resume not supported or no playback active") + + if _stream_lock: + _stream_lock.release() + + +def set_volume(volume): + """ + Set system volume (affects new streams, not current playback). + + Args: + volume: Volume level (0-100) + """ + global _volume + _volume = max(0, min(100, volume)) + + +def get_volume(): + """ + Get system volume. + + Returns: + int: Current system volume (0-100) + """ + return _volume + + +def get_device_type(): + """ + Get configured audio device type. + + Returns: + int: Device type (DEVICE_NULL, DEVICE_I2S, DEVICE_BUZZER, DEVICE_BOTH) + """ + return _device_type + + +def is_playing(): + """ + Check if audio is currently playing. + + Returns: + bool: True if playback active, False otherwise + """ + if _stream_lock: + _stream_lock.acquire() + + result = _current_stream is not None and _current_stream.is_playing() + + if _stream_lock: + _stream_lock.release() + + return result diff --git a/internal_filesystem/lib/mpos/audio/stream_rtttl.py b/internal_filesystem/lib/mpos/audio/stream_rtttl.py new file mode 100644 index 00000000..00bae756 --- /dev/null +++ b/internal_filesystem/lib/mpos/audio/stream_rtttl.py @@ -0,0 +1,231 @@ +# RTTTLStream - RTTTL Ringtone Playback Stream for AudioFlinger +# Ring Tone Text Transfer Language parser and player +# Ported from Fri3d Camp 2024 Badge firmware + +import math +import time + + +class RTTTLStream: + """ + RTTTL (Ring Tone Text Transfer Language) parser and player. + Format: "name:defaults:notes" + Example: "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#,8c#6,8b,d" + + See: https://en.wikipedia.org/wiki/Ring_Tone_Text_Transfer_Language + """ + + # Note frequency table (A-G, with sharps) + _NOTES = [ + 440.0, # A + 493.9, # B or H + 261.6, # C + 293.7, # D + 329.6, # E + 349.2, # F + 392.0, # G + 0.0, # pad + + 466.2, # A# + 0.0, # pad + 277.2, # C# + 311.1, # D# + 0.0, # pad + 370.0, # F# + 415.3, # G# + 0.0, # pad + ] + + def __init__(self, rtttl_string, stream_type, volume, buzzer_instance, on_complete): + """ + Initialize RTTTL stream. + + Args: + rtttl_string: RTTTL format string (e.g., "Nokia:d=4,o=5,b=225:...") + stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) + volume: Volume level (0-100) + buzzer_instance: PWM buzzer instance + on_complete: Callback function(message) when playback finishes + """ + self.stream_type = stream_type + self.volume = volume + self.buzzer = buzzer_instance + self.on_complete = on_complete + self._keep_running = True + self._is_playing = False + + # Parse RTTTL format + tune_pieces = rtttl_string.split(':') + if len(tune_pieces) != 3: + raise ValueError('RTTTL should contain exactly 2 colons') + + self.name = tune_pieces[0] + self.tune = tune_pieces[2] + self.tune_idx = 0 + self._parse_defaults(tune_pieces[1]) + + def is_playing(self): + """Check if stream is currently playing.""" + return self._is_playing + + def stop(self): + """Stop playback.""" + self._keep_running = False + + def _parse_defaults(self, defaults): + """ + Parse default values from RTTTL format. + Example: "d=4,o=5,b=140" + """ + self.default_duration = 4 + self.default_octave = 5 + self.bpm = 120 + + for item in defaults.split(','): + setting = item.split('=') + if len(setting) != 2: + continue + + key = setting[0].strip() + value = int(setting[1].strip()) + + if key == 'o': + self.default_octave = value + elif key == 'd': + self.default_duration = value + elif key == 'b': + self.bpm = value + + # Calculate milliseconds per whole note + # 240000 = 60 sec/min * 4 beats/whole-note * 1000 msec/sec + self.msec_per_whole_note = 240000.0 / self.bpm + + def _next_char(self): + """Get next character from tune string.""" + if self.tune_idx < len(self.tune): + char = self.tune[self.tune_idx] + self.tune_idx += 1 + if char == ',': + char = ' ' + return char + return '|' # End marker + + def _notes(self): + """ + Generator that yields (frequency, duration_ms) tuples. + + Yields: + tuple: (frequency_hz, duration_ms) for each note + """ + while True: + # Skip blank characters and commas + char = self._next_char() + while char == ' ': + char = self._next_char() + + # Parse duration (if present) + # Duration of 1 = whole note, 8 = 1/8 note + duration = 0 + while char.isdigit(): + duration *= 10 + duration += ord(char) - ord('0') + char = self._next_char() + + if duration == 0: + duration = self.default_duration + + if char == '|': # End of tune + return + + # Parse note letter + note = char.lower() + if 'a' <= note <= 'g': + note_idx = ord(note) - ord('a') + elif note == 'h': + note_idx = 1 # H is equivalent to B + elif note == 'p': + note_idx = 7 # Pause + else: + note_idx = 7 # Unknown = pause + + char = self._next_char() + + # Check for sharp + if char == '#': + note_idx += 8 + char = self._next_char() + + # Check for duration modifier (dot) before octave + duration_multiplier = 1.0 + if char == '.': + duration_multiplier = 1.5 + char = self._next_char() + + # Check for octave + if '4' <= char <= '7': + octave = ord(char) - ord('0') + char = self._next_char() + else: + octave = self.default_octave + + # Check for duration modifier (dot) after octave + if char == '.': + duration_multiplier = 1.5 + char = self._next_char() + + # Calculate frequency and duration + freq = self._NOTES[note_idx] * (1 << (octave - 4)) + msec = (self.msec_per_whole_note / duration) * duration_multiplier + + yield freq, msec + + def play(self): + """Play RTTTL tune via buzzer (runs in background thread).""" + self._is_playing = True + + # Calculate exponential duty cycle for perceptually linear volume + if self.volume <= 0: + duty = 0 + else: + volume = min(100, self.volume) + + # Exponential volume curve + # Maximum volume is at 50% duty cycle (32768 when using duty_u16) + # Minimum is 4 (absolute minimum for audible PWM) + divider = 10 + duty = int( + ((math.exp(volume / divider) - math.exp(0.1)) / + (math.exp(10) - math.exp(0.1)) * (32768 - 4)) + 4 + ) + + print(f"RTTTLStream: Playing '{self.name}' (volume {self.volume}%)") + + try: + for freq, msec in self._notes(): + if not self._keep_running: + print("RTTTLStream: Playback stopped by user") + break + + # Play tone + if freq > 0: + self.buzzer.freq(int(freq)) + self.buzzer.duty_u16(duty) + + # Play for 90% of duration, silent for 10% (note separation) + time.sleep_ms(int(msec * 0.9)) + self.buzzer.duty_u16(0) + time.sleep_ms(int(msec * 0.1)) + + print(f"RTTTLStream: Finished playing '{self.name}'") + if self.on_complete: + self.on_complete(f"Finished: {self.name}") + + except Exception as e: + print(f"RTTTLStream: Error: {e}") + if self.on_complete: + self.on_complete(f"Error: {e}") + + finally: + # Ensure buzzer is off + self.buzzer.duty_u16(0) + self._is_playing = False diff --git a/internal_filesystem/apps/com.micropythonos.musicplayer/assets/audio_player.py b/internal_filesystem/lib/mpos/audio/stream_wav.py similarity index 51% rename from internal_filesystem/apps/com.micropythonos.musicplayer/assets/audio_player.py rename to internal_filesystem/lib/mpos/audio/stream_wav.py index 0b298735..4c527065 100644 --- a/internal_filesystem/apps/com.micropythonos.musicplayer/assets/audio_player.py +++ b/internal_filesystem/lib/mpos/audio/stream_wav.py @@ -1,29 +1,83 @@ +# WAVStream - WAV File Playback Stream for AudioFlinger +# Supports 8/16/24/32-bit PCM, mono+stereo, auto-upsampling, volume control +# Ported from MusicPlayer's AudioPlayer class + import machine import os import time -import micropython +import sys + +# Volume scaling function - regular Python version +# Note: Viper optimization removed because @micropython.viper decorator +# causes cross-compiler errors on Unix/macOS builds even inside conditionals +def _scale_audio(buf, num_bytes, scale_fixed): + """Volume scaling for 16-bit audio samples.""" + for i in range(0, num_bytes, 2): + lo = buf[i] + hi = buf[i + 1] + sample = (hi << 8) | lo + if hi & 128: + sample -= 65536 + sample = (sample * scale_fixed) // 32768 + if sample > 32767: + sample = 32767 + elif sample < -32768: + sample = -32768 + buf[i] = sample & 255 + buf[i + 1] = (sample >> 8) & 255 -# ---------------------------------------------------------------------- -# AudioPlayer – robust, volume-controllable WAV player -# Supports 8 / 16 / 24 / 32-bit PCM, mono + stereo -# Auto-up-samples any rate < 22050 Hz to >=22050 Hz -# ---------------------------------------------------------------------- -class AudioPlayer: - _i2s = None - _volume = 50 # 0-100 - _keep_running = True +class WAVStream: + """ + WAV file playback stream with I2S output. + Supports 8/16/24/32-bit PCM, mono and stereo, auto-upsampling to >=22050 Hz. + """ - # ------------------------------------------------------------------ - # WAV header parser – returns bit-depth - # ------------------------------------------------------------------ + def __init__(self, file_path, stream_type, volume, i2s_pins, on_complete): + """ + Initialize WAV stream. + + Args: + file_path: Path to WAV file + stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) + volume: Volume level (0-100) + i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers + on_complete: Callback function(message) when playback finishes + """ + self.file_path = file_path + self.stream_type = stream_type + self.volume = volume + self.i2s_pins = i2s_pins + self.on_complete = on_complete + self._keep_running = True + self._is_playing = False + self._i2s = None + + def is_playing(self): + """Check if stream is currently playing.""" + return self._is_playing + + def stop(self): + """Stop playback.""" + self._keep_running = False + + # ---------------------------------------------------------------------- + # WAV header parser - returns bit-depth and format info + # ---------------------------------------------------------------------- @staticmethod - def find_data_chunk(f): - """Return (data_start, data_size, sample_rate, channels, bits_per_sample)""" + def _find_data_chunk(f): + """ + Parse WAV header and find data chunk. + + Returns: + tuple: (data_start, data_size, sample_rate, channels, bits_per_sample) + """ f.seek(0) if f.read(4) != b'RIFF': raise ValueError("Not a RIFF (standard .wav) file") + file_size = int.from_bytes(f.read(4), 'little') + 8 + if f.read(4) != b'WAVE': raise ValueError("Not a WAVE (standard .wav) file") @@ -31,87 +85,61 @@ class AudioPlayer: sample_rate = None channels = None bits_per_sample = None + while pos < file_size: f.seek(pos) chunk_id = f.read(4) if len(chunk_id) < 4: break + chunk_size = int.from_bytes(f.read(4), 'little') + if chunk_id == b'fmt ': fmt = f.read(chunk_size) if len(fmt) < 16: raise ValueError("Invalid fmt chunk") + if int.from_bytes(fmt[0:2], 'little') != 1: raise ValueError("Only PCM supported") + channels = int.from_bytes(fmt[2:4], 'little') if channels not in (1, 2): raise ValueError("Only mono or stereo supported") + sample_rate = int.from_bytes(fmt[4:8], 'little') bits_per_sample = int.from_bytes(fmt[14:16], 'little') + if bits_per_sample not in (8, 16, 24, 32): raise ValueError("Only 8/16/24/32-bit PCM supported") + elif chunk_id == b'data': return f.tell(), chunk_size, sample_rate, channels, bits_per_sample + pos += 8 + chunk_size if chunk_size % 2: pos += 1 + raise ValueError("No 'data' chunk found") - # ------------------------------------------------------------------ - # Volume control - # ------------------------------------------------------------------ - @classmethod - def set_volume(cls, volume: int): - volume = max(0, min(100, volume)) - cls._volume = volume - - @classmethod - def get_volume(cls) -> int: - return cls._volume - - @classmethod - def stop_playing(cls): - print("stop_playing()") - cls._keep_running = False - - # ------------------------------------------------------------------ - # 1. Up-sample 16-bit buffer (zero-order-hold) - # ------------------------------------------------------------------ + # ---------------------------------------------------------------------- + # Bit depth conversion functions + # ---------------------------------------------------------------------- @staticmethod - def _upsample_buffer(raw: bytearray, factor: int) -> bytearray: - if factor == 1: - return raw - upsampled = bytearray(len(raw) * factor) - out_idx = 0 - for i in range(0, len(raw), 2): - lo = raw[i] - hi = raw[i + 1] - for _ in range(factor): - upsampled[out_idx] = lo - upsampled[out_idx + 1] = hi - out_idx += 2 - return upsampled - - # ------------------------------------------------------------------ - # 2. Convert 8-bit to 16-bit (non-viper, Viper-safe) - # ------------------------------------------------------------------ - @staticmethod - def _convert_8_to_16(buf: bytearray) -> bytearray: + def _convert_8_to_16(buf): + """Convert 8-bit unsigned PCM to 16-bit signed PCM.""" out = bytearray(len(buf) * 2) j = 0 for i in range(len(buf)): u8 = buf[i] s16 = (u8 - 128) << 8 - out[j] = s16 & 0xFF + out[j] = s16 & 0xFF out[j + 1] = (s16 >> 8) & 0xFF j += 2 return out - # ------------------------------------------------------------------ - # 3. Convert 24-bit to 16-bit (non-viper) - # ------------------------------------------------------------------ @staticmethod - def _convert_24_to_16(buf: bytearray) -> bytearray: + def _convert_24_to_16(buf): + """Convert 24-bit PCM to 16-bit PCM.""" samples = len(buf) // 3 out = bytearray(samples * 2) j = 0 @@ -123,16 +151,14 @@ class AudioPlayer: if b2 & 0x80: s24 -= 0x1000000 s16 = s24 >> 8 - out[i * 2] = s16 & 0xFF + out[i * 2] = s16 & 0xFF out[i * 2 + 1] = (s16 >> 8) & 0xFF j += 3 return out - # ------------------------------------------------------------------ - # 4. Convert 32-bit to 16-bit (non-viper) - # ------------------------------------------------------------------ @staticmethod - def _convert_32_to_16(buf: bytearray) -> bytearray: + def _convert_32_to_16(buf): + """Convert 32-bit PCM to 16-bit PCM.""" samples = len(buf) // 4 out = bytearray(samples * 2) j = 0 @@ -145,28 +171,49 @@ class AudioPlayer: if b3 & 0x80: s32 -= 0x100000000 s16 = s32 >> 16 - out[i * 2] = s16 & 0xFF + out[i * 2] = s16 & 0xFF out[i * 2 + 1] = (s16 >> 8) & 0xFF j += 4 return out - # ------------------------------------------------------------------ + # ---------------------------------------------------------------------- + # Upsampling (zero-order-hold) + # ---------------------------------------------------------------------- + @staticmethod + def _upsample_buffer(raw, factor): + """Upsample 16-bit buffer by repeating samples.""" + if factor == 1: + return raw + + upsampled = bytearray(len(raw) * factor) + out_idx = 0 + for i in range(0, len(raw), 2): + lo = raw[i] + hi = raw[i + 1] + for _ in range(factor): + upsampled[out_idx] = lo + upsampled[out_idx + 1] = hi + out_idx += 2 + return upsampled + + # ---------------------------------------------------------------------- # Main playback routine - # ------------------------------------------------------------------ - @classmethod - def play_wav(cls, filename, result_callback=None): - cls._keep_running = True + # ---------------------------------------------------------------------- + def play(self): + """Main playback routine (runs in background thread).""" + self._is_playing = True + try: - with open(filename, 'rb') as f: - st = os.stat(filename) + with open(self.file_path, 'rb') as f: + st = os.stat(self.file_path) file_size = st[6] - print(f"File size: {file_size} bytes") + print(f"WAVStream: Playing {self.file_path} ({file_size} bytes)") - # ----- parse header ------------------------------------------------ + # Parse WAV header data_start, data_size, original_rate, channels, bits_per_sample = \ - cls.find_data_chunk(f) + self._find_data_chunk(f) - # ----- decide playback rate (force >=22050 Hz) -------------------- + # Decide playback rate (force >=22050 Hz) target_rate = 22050 if original_rate >= target_rate: playback_rate = original_rate @@ -175,20 +222,20 @@ class AudioPlayer: upsample_factor = (target_rate + original_rate - 1) // original_rate playback_rate = original_rate * upsample_factor - print(f"Original: {original_rate} Hz, {bits_per_sample}-bit, {channels}-ch " - f"to Playback: {playback_rate} Hz (factor {upsample_factor})") + print(f"WAVStream: {original_rate} Hz, {bits_per_sample}-bit, {channels}-ch") + print(f"WAVStream: Playback at {playback_rate} Hz (factor {upsample_factor})") if data_size > file_size - data_start: data_size = file_size - data_start - # ----- I2S init (always 16-bit) ---------------------------------- + # Initialize I2S (always 16-bit output) try: i2s_format = machine.I2S.MONO if channels == 1 else machine.I2S.STEREO - cls._i2s = machine.I2S( + self._i2s = machine.I2S( 0, - sck=machine.Pin(2, machine.Pin.OUT), - ws =machine.Pin(47, machine.Pin.OUT), - sd =machine.Pin(16, machine.Pin.OUT), + sck=machine.Pin(self.i2s_pins['sck'], machine.Pin.OUT), + ws=machine.Pin(self.i2s_pins['ws'], machine.Pin.OUT), + sd=machine.Pin(self.i2s_pins['sd'], machine.Pin.OUT), mode=machine.I2S.TX, bits=16, format=i2s_format, @@ -196,38 +243,22 @@ class AudioPlayer: ibuf=32000 ) except Exception as e: - print(f"Warning: simulating playback (I2S init failed): {e}") + print(f"WAVStream: I2S init failed: {e}") + return - print(f"Playing {data_size} original bytes (vol {cls._volume}%) ...") + print(f"WAVStream: Playing {data_size} bytes (volume {self.volume}%)") f.seek(data_start) - # ----- Viper volume scaler (16-bit only) ------------------------- - @micropython.viper # throws "invalid micropython decorator" on macOS / darwin - def scale_audio(buf: ptr8, num_bytes: int, scale_fixed: int): - for i in range(0, num_bytes, 2): - lo = int(buf[i]) - hi = int(buf[i+1]) - sample = (hi << 8) | lo - if hi & 128: - sample -= 65536 - sample = (sample * scale_fixed) // 32768 - if sample > 32767: - sample = 32767 - elif sample < -32768: - sample = -32768 - buf[i] = sample & 255 - buf[i+1] = (sample >> 8) & 255 - chunk_size = 4096 bytes_per_original_sample = (bits_per_sample // 8) * channels total_original = 0 while total_original < data_size: - if not cls._keep_running: - print("Playback stopped by user.") + if not self._keep_running: + print("WAVStream: Playback stopped by user") break - # ---- read a whole-sample chunk of original data ------------- + # Read chunk of original data to_read = min(chunk_size, data_size - total_original) to_read -= (to_read % bytes_per_original_sample) if to_read <= 0: @@ -237,44 +268,46 @@ class AudioPlayer: if not raw: break - # ---- 1. Convert bit-depth to 16-bit (non-viper) ------------- + # 1. Convert bit-depth to 16-bit if bits_per_sample == 8: - raw = cls._convert_8_to_16(raw) + raw = self._convert_8_to_16(raw) elif bits_per_sample == 24: - raw = cls._convert_24_to_16(raw) + raw = self._convert_24_to_16(raw) elif bits_per_sample == 32: - raw = cls._convert_32_to_16(raw) - # 16-bit to unchanged + raw = self._convert_32_to_16(raw) + # 16-bit unchanged - # ---- 2. Up-sample if needed --------------------------------- + # 2. Upsample if needed if upsample_factor > 1: - raw = cls._upsample_buffer(raw, upsample_factor) + raw = self._upsample_buffer(raw, upsample_factor) - # ---- 3. Volume scaling -------------------------------------- - scale = cls._volume / 100.0 + # 3. Volume scaling + scale = self.volume / 100.0 if scale < 1.0: scale_fixed = int(scale * 32768) - scale_audio(raw, len(raw), scale_fixed) + _scale_audio(raw, len(raw), scale_fixed) - # ---- 4. Output --------------------------------------------- - if cls._i2s: - cls._i2s.write(raw) + # 4. Output to I2S + if self._i2s: + self._i2s.write(raw) else: + # Simulate playback timing if no I2S num_samples = len(raw) // (2 * channels) time.sleep(num_samples / playback_rate) total_original += to_read - print(f"Finished playing {filename}") - if result_callback: - result_callback(f"Finished playing {filename}") + print(f"WAVStream: Finished playing {self.file_path}") + if self.on_complete: + self.on_complete(f"Finished: {self.file_path}") + except Exception as e: - print(f"Error: {e}\nwhile playing {filename}") - if result_callback: - result_callback(f"Error: {e}\nwhile playing {filename}") + print(f"WAVStream: Error: {e}") + if self.on_complete: + self.on_complete(f"Error: {e}") + finally: - if cls._i2s: - cls._i2s.deinit() - cls._i2s = None - - + self._is_playing = False + if self._i2s: + self._i2s.deinit() + self._i2s = None diff --git a/internal_filesystem/lib/mpos/board/fri3d_2024.py b/internal_filesystem/lib/mpos/board/fri3d_2024.py index 922ecf48..2ae66897 100644 --- a/internal_filesystem/lib/mpos/board/fri3d_2024.py +++ b/internal_filesystem/lib/mpos/board/fri3d_2024.py @@ -289,4 +289,33 @@ mpos.battery_voltage.init_adc(13, adc_to_voltage) import mpos.sdcard mpos.sdcard.init(spi_bus, cs_pin=14) +# === AUDIO HARDWARE === +from machine import PWM, Pin +import mpos.audio.audioflinger as AudioFlinger + +# Initialize buzzer (GPIO 46) +buzzer = PWM(Pin(46), freq=550, duty=0) + +# I2S pin configuration (GPIO 2, 47, 16) +# Note: I2S is created per-stream, not at boot (only one instance can exist) +i2s_pins = { + 'sck': 2, + 'ws': 47, + 'sd': 16, +} + +# Initialize AudioFlinger (both I2S and buzzer available) +AudioFlinger.init( + device_type=AudioFlinger.DEVICE_BOTH, + i2s_pins=i2s_pins, + buzzer_instance=buzzer +) + +# === LED HARDWARE === +import mpos.lights as LightsManager + +# Initialize 5 NeoPixel LEDs (GPIO 12) +LightsManager.init(neopixel_pin=12, num_leds=5) + +print("Fri3d hardware: Audio and LEDs initialized") print("boot.py finished") diff --git a/internal_filesystem/lib/mpos/board/linux.py b/internal_filesystem/lib/mpos/board/linux.py index 190a428c..913a16d0 100644 --- a/internal_filesystem/lib/mpos/board/linux.py +++ b/internal_filesystem/lib/mpos/board/linux.py @@ -95,6 +95,21 @@ def adc_to_voltage(adc_value): mpos.battery_voltage.init_adc(999, adc_to_voltage) +# === AUDIO HARDWARE === +import mpos.audio.audioflinger as AudioFlinger + +# Note: Desktop builds have no audio hardware +# AudioFlinger functions will return False (no-op) +AudioFlinger.init( + device_type=AudioFlinger.DEVICE_NULL, + i2s_pins=None, + buzzer_instance=None +) + +# === LED HARDWARE === +# Note: Desktop builds have no LED hardware +# LightsManager will not be initialized (functions will return False) + print("linux.py finished") diff --git a/internal_filesystem/lib/mpos/board/waveshare_esp32_s3_touch_lcd_2.py b/internal_filesystem/lib/mpos/board/waveshare_esp32_s3_touch_lcd_2.py index 46342af5..c2133f6c 100644 --- a/internal_filesystem/lib/mpos/board/waveshare_esp32_s3_touch_lcd_2.py +++ b/internal_filesystem/lib/mpos/board/waveshare_esp32_s3_touch_lcd_2.py @@ -110,4 +110,20 @@ try: except Exception as e: print(f"Warning: powering off camera got exception: {e}") +# === AUDIO HARDWARE === +import mpos.audio.audioflinger as AudioFlinger + +# Note: Waveshare board has no buzzer or LEDs, only I2S audio +# I2S pin configuration will be determined by the board's audio hardware +# For now, initialize with I2S only (pins will be configured per-stream if available) +AudioFlinger.init( + device_type=AudioFlinger.DEVICE_I2S, + i2s_pins={'sck': 2, 'ws': 47, 'sd': 16}, # Default ESP32-S3 I2S pins + buzzer_instance=None +) + +# === LED HARDWARE === +# Note: Waveshare board has no NeoPixel LEDs +# LightsManager will not be initialized (functions will return False) + print("boot.py finished") diff --git a/internal_filesystem/lib/mpos/hardware/fri3d/__init__.py b/internal_filesystem/lib/mpos/hardware/fri3d/__init__.py new file mode 100644 index 00000000..18919b17 --- /dev/null +++ b/internal_filesystem/lib/mpos/hardware/fri3d/__init__.py @@ -0,0 +1,8 @@ +# Fri3d Camp 2024 Badge Hardware Drivers +# These are simple wrappers that can be used by services like AudioFlinger + +from .buzzer import BuzzerConfig +from .leds import LEDConfig +from .rtttl_data import RTTTL_SONGS + +__all__ = ['BuzzerConfig', 'LEDConfig', 'RTTTL_SONGS'] diff --git a/internal_filesystem/lib/mpos/hardware/fri3d/buzzer.py b/internal_filesystem/lib/mpos/hardware/fri3d/buzzer.py new file mode 100644 index 00000000..2ebfa98a --- /dev/null +++ b/internal_filesystem/lib/mpos/hardware/fri3d/buzzer.py @@ -0,0 +1,11 @@ +# Fri3d Camp 2024 Badge - Buzzer Configuration + +class BuzzerConfig: + """Configuration for PWM buzzer hardware.""" + + # GPIO pin for buzzer + PIN = 46 + + # Default PWM settings + DEFAULT_FREQ = 550 # Hz + DEFAULT_DUTY = 0 # Off by default diff --git a/internal_filesystem/lib/mpos/hardware/fri3d/leds.py b/internal_filesystem/lib/mpos/hardware/fri3d/leds.py new file mode 100644 index 00000000..f14b740d --- /dev/null +++ b/internal_filesystem/lib/mpos/hardware/fri3d/leds.py @@ -0,0 +1,10 @@ +# Fri3d Camp 2024 Badge - LED Configuration + +class LEDConfig: + """Configuration for NeoPixel RGB LED hardware.""" + + # GPIO pin for NeoPixel data line + PIN = 12 + + # Number of NeoPixel LEDs on badge + NUM_LEDS = 5 diff --git a/internal_filesystem/lib/mpos/hardware/fri3d/rtttl_data.py b/internal_filesystem/lib/mpos/hardware/fri3d/rtttl_data.py new file mode 100644 index 00000000..38174890 --- /dev/null +++ b/internal_filesystem/lib/mpos/hardware/fri3d/rtttl_data.py @@ -0,0 +1,18 @@ +# RTTTL Song Catalog +# Ring Tone Text Transfer Language songs for buzzer playback +# Format: "name:defaults:notes" +# Ported from Fri3d Camp 2024 Badge firmware + +RTTTL_SONGS = { + "nokia": "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#,8c#6,8b,d,8p,8b,8a,8c#,8e,8a,8p", + + "macarena": "Macarena:d=4,o=5,b=180:f,8f,8f,f,8f,8f,8f,8f,8f,8f,8f,8a,c,8c,f,8f,8f,f,8f,8f,8f,8f,8f,8f,8d,8c,p,f,8f,8f,f,8f,8f,8f,8f,8f,8f,8f,8a,p,2c,f,8f,8f,f,8f,8f,8f,8f,8f,8f,8d,8c", + + "takeonme": "TakeOnMe:d=4,o=4,b=160:8f#5,8f#5,8f#5,8d5,8p,8b,8p,8e5,8p,8e5,8p,8e5,8g#5,8g#5,8a5,8b5,8a5,8a5,8a5,8e5,8p,8d5,8p,8f#5,8p,8f#5,8p,8f#5,8e5,8e5,8f#5,8e5", + + "goodbadugly": "TheGoodTheBad:d=4,o=5,b=160:c,8d,8e,8d,c,8d,8e,8d,c,8d,e,8f,2g,8p,a,b,c6,8b,8a,8g,8f,e,8f,g,8e,8d,8c", + + "creeps": "Creeps:d=4,o=5,b=120:8c,8d,8e,8f,g,8e,8f,g,8f,8e,8d,c,8d,8e,f,8d,8e,f,8e,8d,8c,8b4", + + "william_tell": "WilliamTell:d=4,o=5,b=125:8e,8e,8e,2p,8e,8e,8e,2p,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,8e,e" +} diff --git a/internal_filesystem/lib/mpos/lights.py b/internal_filesystem/lib/mpos/lights.py new file mode 100644 index 00000000..2f0d7b7a --- /dev/null +++ b/internal_filesystem/lib/mpos/lights.py @@ -0,0 +1,153 @@ +# LightsManager - Simple LED Control Service for MicroPythonOS +# Provides one-shot LED control for NeoPixel RGB LEDs +# Apps implement custom animations using the update_frame() pattern + +# Module-level state (singleton pattern) +_neopixel = None +_num_leds = 0 + + +def init(neopixel_pin, num_leds=5): + """ + Initialize NeoPixel LEDs. + + Args: + neopixel_pin: GPIO pin number for NeoPixel data line + num_leds: Number of LEDs in the strip (default 5 for Fri3d badge) + """ + global _neopixel, _num_leds + + try: + from machine import Pin + from neopixel import NeoPixel + + _neopixel = NeoPixel(Pin(neopixel_pin, Pin.OUT), num_leds) + _num_leds = num_leds + + # Clear all LEDs on initialization + for i in range(num_leds): + _neopixel[i] = (0, 0, 0) + _neopixel.write() + + print(f"LightsManager initialized: {num_leds} LEDs on GPIO {neopixel_pin}") + except Exception as e: + print(f"LightsManager: Failed to initialize LEDs: {e}") + print(" - LED functions will return False (no-op)") + + +def is_available(): + """ + Check if LED hardware is available. + + Returns: + bool: True if LEDs are initialized and available + """ + return _neopixel is not None + + +def get_led_count(): + """ + Get the number of LEDs. + + Returns: + int: Number of LEDs, or 0 if not initialized + """ + return _num_leds + + +def set_led(index, r, g, b): + """ + Set a single LED color (buffered until write() is called). + + Args: + index: LED index (0 to num_leds-1) + r: Red value (0-255) + g: Green value (0-255) + b: Blue value (0-255) + + Returns: + bool: True if successful, False if LEDs unavailable or invalid index + """ + if not _neopixel: + return False + + if index < 0 or index >= _num_leds: + print(f"LightsManager: Invalid LED index {index} (valid range: 0-{_num_leds-1})") + return False + + _neopixel[index] = (r, g, b) + return True + + +def set_all(r, g, b): + """ + Set all LEDs to the same color (buffered until write() is called). + + Args: + r: Red value (0-255) + g: Green value (0-255) + b: Blue value (0-255) + + Returns: + bool: True if successful, False if LEDs unavailable + """ + if not _neopixel: + return False + + for i in range(_num_leds): + _neopixel[i] = (r, g, b) + return True + + +def clear(): + """ + Clear all LEDs (set to black, buffered until write() is called). + + Returns: + bool: True if successful, False if LEDs unavailable + """ + return set_all(0, 0, 0) + + +def write(): + """ + Update hardware with buffered LED colors. + Must be called after set_led(), set_all(), or clear() to make changes visible. + + Returns: + bool: True if successful, False if LEDs unavailable + """ + if not _neopixel: + return False + + _neopixel.write() + return True + + +def set_notification_color(color_name): + """ + Convenience method to set all LEDs to a common color and update immediately. + + Args: + color_name: Color name (red, green, blue, yellow, orange, purple, white) + + Returns: + bool: True if successful, False if LEDs unavailable or unknown color + """ + colors = { + "red": (255, 0, 0), + "green": (0, 255, 0), + "blue": (0, 0, 255), + "yellow": (255, 255, 0), + "orange": (255, 128, 0), + "purple": (128, 0, 255), + "white": (255, 255, 255), + } + + color = colors.get(color_name.lower()) + if not color: + print(f"LightsManager: Unknown color '{color_name}'") + print(f" - Available colors: {', '.join(colors.keys())}") + return False + + return set_all(*color) and write() diff --git a/tests/mocks/hardware_mocks.py b/tests/mocks/hardware_mocks.py new file mode 100644 index 00000000..b2d2e97e --- /dev/null +++ b/tests/mocks/hardware_mocks.py @@ -0,0 +1,102 @@ +# Hardware Mocks for Testing AudioFlinger and LightsManager +# Provides mock implementations of PWM, I2S, NeoPixel, and Pin classes + + +class MockPin: + """Mock machine.Pin for testing.""" + + IN = 0 + OUT = 1 + PULL_UP = 2 + + def __init__(self, pin_number, mode=None, pull=None): + self.pin_number = pin_number + self.mode = mode + self.pull = pull + self._value = 0 + + def value(self, val=None): + if val is not None: + self._value = val + return self._value + + +class MockPWM: + """Mock machine.PWM for testing buzzer.""" + + def __init__(self, pin, freq=0, duty=0): + self.pin = pin + self.last_freq = freq + self.last_duty = duty + self.freq_history = [] + self.duty_history = [] + + def freq(self, value=None): + """Set or get frequency.""" + if value is not None: + self.last_freq = value + self.freq_history.append(value) + return self.last_freq + + def duty_u16(self, value=None): + """Set or get duty cycle (0-65535).""" + if value is not None: + self.last_duty = value + self.duty_history.append(value) + return self.last_duty + + +class MockI2S: + """Mock machine.I2S for testing audio playback.""" + + TX = 0 + MONO = 1 + STEREO = 2 + + def __init__(self, id, sck, ws, sd, mode, bits, format, rate, ibuf): + self.id = id + self.sck = sck + self.ws = ws + self.sd = sd + self.mode = mode + self.bits = bits + self.format = format + self.rate = rate + self.ibuf = ibuf + self.written_bytes = [] + self.total_bytes_written = 0 + + def write(self, buf): + """Simulate writing to I2S hardware.""" + self.written_bytes.append(bytes(buf)) + self.total_bytes_written += len(buf) + return len(buf) + + def deinit(self): + """Deinitialize I2S.""" + pass + + +class MockNeoPixel: + """Mock neopixel.NeoPixel for testing LEDs.""" + + def __init__(self, pin, num_leds): + self.pin = pin + self.num_leds = num_leds + self.pixels = [(0, 0, 0)] * num_leds + self.write_count = 0 + + def __setitem__(self, index, value): + """Set LED color (R, G, B) tuple.""" + if 0 <= index < self.num_leds: + self.pixels[index] = value + + def __getitem__(self, index): + """Get LED color.""" + if 0 <= index < self.num_leds: + return self.pixels[index] + return (0, 0, 0) + + def write(self): + """Update hardware (mock - just increment counter).""" + self.write_count += 1 diff --git a/tests/test_audioflinger.py b/tests/test_audioflinger.py new file mode 100644 index 00000000..039d6b1d --- /dev/null +++ b/tests/test_audioflinger.py @@ -0,0 +1,243 @@ +# Unit tests for AudioFlinger service +import unittest +import sys + + +# Mock hardware before importing +class MockPWM: + def __init__(self, pin, freq=0, duty=0): + self.pin = pin + self.last_freq = freq + self.last_duty = duty + + def freq(self, value=None): + if value is not None: + self.last_freq = value + return self.last_freq + + def duty_u16(self, value=None): + if value is not None: + self.last_duty = value + return self.last_duty + + +class MockPin: + IN = 0 + OUT = 1 + + def __init__(self, pin_number, mode=None): + self.pin_number = pin_number + self.mode = mode + + +# Inject mocks +class MockMachine: + PWM = MockPWM + Pin = MockPin +sys.modules['machine'] = MockMachine() + +class MockLock: + def acquire(self): + pass + def release(self): + pass + +class MockThread: + @staticmethod + def allocate_lock(): + return MockLock() + @staticmethod + def start_new_thread(func, args, **kwargs): + pass # No-op for testing + @staticmethod + def stack_size(size=None): + return 16384 if size is None else None + +sys.modules['_thread'] = MockThread() + +class MockMposApps: + @staticmethod + def good_stack_size(): + return 16384 + +sys.modules['mpos.apps'] = MockMposApps() + + +# Now import the module to test +import mpos.audio.audioflinger as AudioFlinger + + +class TestAudioFlinger(unittest.TestCase): + """Test cases for AudioFlinger service.""" + + def setUp(self): + """Initialize AudioFlinger before each test.""" + self.buzzer = MockPWM(MockPin(46)) + self.i2s_pins = {'sck': 2, 'ws': 47, 'sd': 16} + + # Reset volume to default before each test + AudioFlinger.set_volume(70) + + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_BOTH, + i2s_pins=self.i2s_pins, + buzzer_instance=self.buzzer + ) + + def tearDown(self): + """Clean up after each test.""" + AudioFlinger.stop() + + def test_initialization(self): + """Test that AudioFlinger initializes correctly.""" + self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BOTH) + self.assertEqual(AudioFlinger._i2s_pins, self.i2s_pins) + self.assertEqual(AudioFlinger._buzzer_instance, self.buzzer) + + def test_device_types(self): + """Test device type constants.""" + self.assertEqual(AudioFlinger.DEVICE_NULL, 0) + self.assertEqual(AudioFlinger.DEVICE_I2S, 1) + self.assertEqual(AudioFlinger.DEVICE_BUZZER, 2) + self.assertEqual(AudioFlinger.DEVICE_BOTH, 3) + + def test_stream_types(self): + """Test stream type constants and priority order.""" + self.assertEqual(AudioFlinger.STREAM_MUSIC, 0) + self.assertEqual(AudioFlinger.STREAM_NOTIFICATION, 1) + self.assertEqual(AudioFlinger.STREAM_ALARM, 2) + + # Higher number = higher priority + self.assertTrue(AudioFlinger.STREAM_MUSIC < AudioFlinger.STREAM_NOTIFICATION) + self.assertTrue(AudioFlinger.STREAM_NOTIFICATION < AudioFlinger.STREAM_ALARM) + + def test_volume_control(self): + """Test volume get/set operations.""" + # Set volume + AudioFlinger.set_volume(50) + self.assertEqual(AudioFlinger.get_volume(), 50) + + # Test clamping to 0-100 range + AudioFlinger.set_volume(150) + self.assertEqual(AudioFlinger.get_volume(), 100) + + AudioFlinger.set_volume(-10) + self.assertEqual(AudioFlinger.get_volume(), 0) + + def test_device_null_rejects_playback(self): + """Test that DEVICE_NULL rejects all playback requests.""" + # Re-initialize with no device + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_NULL, + i2s_pins=None, + buzzer_instance=None + ) + + # WAV should be rejected + result = AudioFlinger.play_wav("test.wav") + self.assertFalse(result) + + # RTTTL should be rejected + result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c") + self.assertFalse(result) + + def test_device_i2s_only_rejects_rtttl(self): + """Test that DEVICE_I2S rejects buzzer playback.""" + # Re-initialize with I2S only + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_I2S, + i2s_pins=self.i2s_pins, + buzzer_instance=None + ) + + # RTTTL should be rejected (no buzzer) + result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c") + self.assertFalse(result) + + def test_device_buzzer_only_rejects_wav(self): + """Test that DEVICE_BUZZER rejects I2S playback.""" + # Re-initialize with buzzer only + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_BUZZER, + i2s_pins=None, + buzzer_instance=self.buzzer + ) + + # WAV should be rejected (no I2S) + result = AudioFlinger.play_wav("test.wav") + self.assertFalse(result) + + def test_missing_i2s_pins_rejects_wav(self): + """Test that missing I2S pins rejects WAV playback.""" + # Re-initialize with I2S device but no pins + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_I2S, + i2s_pins=None, + buzzer_instance=None + ) + + result = AudioFlinger.play_wav("test.wav") + self.assertFalse(result) + + def test_is_playing_initially_false(self): + """Test that is_playing() returns False initially.""" + self.assertFalse(AudioFlinger.is_playing()) + + def test_stop_with_no_playback(self): + """Test that stop() can be called when nothing is playing.""" + # Should not raise exception + AudioFlinger.stop() + self.assertFalse(AudioFlinger.is_playing()) + + def test_get_device_type(self): + """Test that get_device_type() returns correct value.""" + # Test DEVICE_BOTH + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_BOTH, + i2s_pins=self.i2s_pins, + buzzer_instance=self.buzzer + ) + self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BOTH) + + # Test DEVICE_I2S + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_I2S, + i2s_pins=self.i2s_pins, + buzzer_instance=None + ) + self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_I2S) + + # Test DEVICE_BUZZER + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_BUZZER, + i2s_pins=None, + buzzer_instance=self.buzzer + ) + self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BUZZER) + + # Test DEVICE_NULL + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_NULL, + i2s_pins=None, + buzzer_instance=None + ) + self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_NULL) + + def test_audio_focus_check_no_current_stream(self): + """Test audio focus allows playback when no stream is active.""" + result = AudioFlinger._check_audio_focus(AudioFlinger.STREAM_MUSIC) + self.assertTrue(result) + + def test_init_creates_lock(self): + """Test that initialization creates a stream lock.""" + self.assertIsNotNone(AudioFlinger._stream_lock) + + def test_volume_default_value(self): + """Test that default volume is reasonable.""" + # After init, volume should be at default (70) + AudioFlinger.init( + device_type=AudioFlinger.DEVICE_NULL, + i2s_pins=None, + buzzer_instance=None + ) + self.assertEqual(AudioFlinger.get_volume(), 70) diff --git a/tests/test_lightsmanager.py b/tests/test_lightsmanager.py new file mode 100644 index 00000000..016ccf6b --- /dev/null +++ b/tests/test_lightsmanager.py @@ -0,0 +1,126 @@ +# Unit tests for LightsManager service +import unittest +import sys + + +# Mock hardware before importing LightsManager +class MockPin: + IN = 0 + OUT = 1 + + def __init__(self, pin_number, mode=None): + self.pin_number = pin_number + self.mode = mode + + +class MockNeoPixel: + def __init__(self, pin, num_leds): + self.pin = pin + self.num_leds = num_leds + self.pixels = [(0, 0, 0)] * num_leds + self.write_count = 0 + + def __setitem__(self, index, value): + if 0 <= index < self.num_leds: + self.pixels[index] = value + + def __getitem__(self, index): + if 0 <= index < self.num_leds: + return self.pixels[index] + return (0, 0, 0) + + def write(self): + self.write_count += 1 + + +# Inject mocks +sys.modules['machine'] = type('module', (), {'Pin': MockPin})() +sys.modules['neopixel'] = type('module', (), {'NeoPixel': MockNeoPixel})() + + +# Now import the module to test +import mpos.lights as LightsManager + + +class TestLightsManager(unittest.TestCase): + """Test cases for LightsManager service.""" + + def setUp(self): + """Initialize LightsManager before each test.""" + LightsManager.init(neopixel_pin=12, num_leds=5) + + def test_initialization(self): + """Test that LightsManager initializes correctly.""" + self.assertTrue(LightsManager.is_available()) + self.assertEqual(LightsManager.get_led_count(), 5) + + def test_set_single_led(self): + """Test setting a single LED color.""" + result = LightsManager.set_led(0, 255, 0, 0) + self.assertTrue(result) + + # Verify color was set (via internal _neopixel mock) + neopixel = LightsManager._neopixel + self.assertEqual(neopixel[0], (255, 0, 0)) + + def test_set_led_invalid_index(self): + """Test that invalid LED indices are rejected.""" + # Negative index + result = LightsManager.set_led(-1, 255, 0, 0) + self.assertFalse(result) + + # Index too large + result = LightsManager.set_led(10, 255, 0, 0) + self.assertFalse(result) + + def test_set_all_leds(self): + """Test setting all LEDs to same color.""" + result = LightsManager.set_all(0, 255, 0) + self.assertTrue(result) + + # Verify all LEDs were set + neopixel = LightsManager._neopixel + for i in range(5): + self.assertEqual(neopixel[i], (0, 255, 0)) + + def test_clear(self): + """Test clearing all LEDs.""" + # First set some colors + LightsManager.set_all(255, 255, 255) + + # Then clear + result = LightsManager.clear() + self.assertTrue(result) + + # Verify all LEDs are black + neopixel = LightsManager._neopixel + for i in range(5): + self.assertEqual(neopixel[i], (0, 0, 0)) + + def test_write(self): + """Test that write() updates hardware.""" + neopixel = LightsManager._neopixel + initial_count = neopixel.write_count + + result = LightsManager.write() + self.assertTrue(result) + + # Verify write was called + self.assertEqual(neopixel.write_count, initial_count + 1) + + def test_notification_colors(self): + """Test convenience notification color method.""" + # Valid colors + self.assertTrue(LightsManager.set_notification_color("red")) + self.assertTrue(LightsManager.set_notification_color("green")) + self.assertTrue(LightsManager.set_notification_color("blue")) + + # Invalid color + result = LightsManager.set_notification_color("invalid_color") + self.assertFalse(result) + + def test_case_insensitive_colors(self): + """Test that color names are case-insensitive.""" + self.assertTrue(LightsManager.set_notification_color("RED")) + self.assertTrue(LightsManager.set_notification_color("Green")) + self.assertTrue(LightsManager.set_notification_color("BLUE")) diff --git a/tests/test_rtttl.py b/tests/test_rtttl.py new file mode 100644 index 00000000..07dbc801 --- /dev/null +++ b/tests/test_rtttl.py @@ -0,0 +1,173 @@ +# Unit tests for RTTTL parser (RTTTLStream) +import unittest +import sys + + +# Mock hardware before importing +class MockPWM: + def __init__(self, pin, freq=0, duty=0): + self.pin = pin + self.last_freq = freq + self.last_duty = duty + self.freq_history = [] + self.duty_history = [] + + def freq(self, value=None): + if value is not None: + self.last_freq = value + self.freq_history.append(value) + return self.last_freq + + def duty_u16(self, value=None): + if value is not None: + self.last_duty = value + self.duty_history.append(value) + return self.last_duty + + +# Inject mock +sys.modules['machine'] = type('module', (), {'PWM': MockPWM, 'Pin': lambda x: x})() + + +# Now import the module to test +from mpos.audio.stream_rtttl import RTTTLStream + + +class TestRTTTL(unittest.TestCase): + """Test cases for RTTTL parser.""" + + def setUp(self): + """Create a mock buzzer before each test.""" + self.buzzer = MockPWM(46) + + def test_parse_simple_rtttl(self): + """Test parsing a simple RTTTL string.""" + rtttl = "Nokia:d=4,o=5,b=225:8e6,8d6,8f#,8g#" + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + self.assertEqual(stream.name, "Nokia") + self.assertEqual(stream.default_duration, 4) + self.assertEqual(stream.default_octave, 5) + self.assertEqual(stream.bpm, 225) + + def test_parse_defaults(self): + """Test parsing default values.""" + rtttl = "Test:d=8,o=6,b=180:c" + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + self.assertEqual(stream.default_duration, 8) + self.assertEqual(stream.default_octave, 6) + self.assertEqual(stream.bpm, 180) + + # Check calculated msec_per_whole_note + # 240000 / 180 = 1333.33... + self.assertAlmostEqual(stream.msec_per_whole_note, 1333.33, places=1) + + def test_invalid_rtttl_format(self): + """Test that invalid RTTTL format raises ValueError.""" + # Missing colons + with self.assertRaises(ValueError): + RTTTLStream("invalid", 0, 100, self.buzzer, None) + + # Too many colons + with self.assertRaises(ValueError): + RTTTLStream("a:b:c:d", 0, 100, self.buzzer, None) + + def test_note_parsing(self): + """Test parsing individual notes.""" + rtttl = "Test:d=4,o=5,b=120:c,d,e" + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + # Generate notes + notes = list(stream._notes()) + + # Should have 3 notes + self.assertEqual(len(notes), 3) + + # Each note should be a tuple of (frequency, duration) + for freq, duration in notes: + self.assertTrue(freq > 0, "Frequency should be non-zero") + self.assertTrue(duration > 0, "Duration should be non-zero") + + def test_sharp_notes(self): + """Test parsing sharp notes.""" + rtttl = "Test:d=4,o=5,b=120:c#,d#,f#" + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + notes = list(stream._notes()) + self.assertEqual(len(notes), 3) + + # Sharp notes should have different frequencies than natural notes + # (can't test exact values without knowing frequency table) + + def test_pause_notes(self): + """Test parsing pause notes.""" + rtttl = "Test:d=4,o=5,b=120:c,p,e" + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + notes = list(stream._notes()) + self.assertEqual(len(notes), 3) + + # Pause (p) should have frequency 0 + freq, duration = notes[1] + self.assertEqual(freq, 0.0) + + def test_duration_modifiers(self): + """Test note duration modifiers (dots).""" + rtttl = "Test:d=4,o=5,b=120:c,c." + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + notes = list(stream._notes()) + self.assertEqual(len(notes), 2) + + # Dotted note should be 1.5x longer + normal_duration = notes[0][1] + dotted_duration = notes[1][1] + self.assertAlmostEqual(dotted_duration / normal_duration, 1.5, places=1) + + def test_octave_variations(self): + """Test notes with different octaves.""" + rtttl = "Test:d=4,o=5,b=120:c4,c5,c6,c7" + stream = RTTTLStream(rtttl, 0, 100, self.buzzer, None) + + notes = list(stream._notes()) + self.assertEqual(len(notes), 4) + + # Higher octaves should have higher frequencies + freqs = [freq for freq, dur in notes] + self.assertTrue(freqs[0] < freqs[1], "c4 should be lower than c5") + self.assertTrue(freqs[1] < freqs[2], "c5 should be lower than c6") + self.assertTrue(freqs[2] < freqs[3], "c6 should be lower than c7") + + def test_volume_scaling(self): + """Test volume to duty cycle conversion.""" + # Test various volume levels + for volume in [0, 25, 50, 75, 100]: + stream = RTTTLStream("Test:d=4,o=5,b=120:c", 0, volume, self.buzzer, None) + + # Volume 0 should result in duty 0 + if volume == 0: + # Note: play() method calculates duty, not __init__ + pass # Can't easily test without calling play() + else: + # Volume > 0 should result in duty > 0 + # (duty calculation happens in play() method) + pass + + def test_stream_type(self): + """Test that stream type is stored correctly.""" + stream = RTTTLStream("Test:d=4,o=5,b=120:c", 2, 100, self.buzzer, None) + self.assertEqual(stream.stream_type, 2) + + def test_stop_flag(self): + """Test that stop flag can be set.""" + stream = RTTTLStream("Test:d=4,o=5,b=120:c", 0, 100, self.buzzer, None) + self.assertTrue(stream._keep_running) + + stream.stop() + self.assertFalse(stream._keep_running) + + def test_is_playing_flag(self): + """Test playing flag is initially false.""" + stream = RTTTLStream("Test:d=4,o=5,b=120:c", 0, 100, self.buzzer, None) + self.assertFalse(stream.is_playing()) diff --git a/tests/test_syspath_restore.py b/tests/test_syspath_restore.py new file mode 100644 index 00000000..36d668d8 --- /dev/null +++ b/tests/test_syspath_restore.py @@ -0,0 +1,78 @@ +import unittest +import sys +import os + +class TestSysPathRestore(unittest.TestCase): + """Test that sys.path is properly restored after execute_script""" + + def test_syspath_restored_after_execute_script(self): + """Test that sys.path is restored to original state after script execution""" + # Import here to ensure we're in the right context + import mpos.apps + + # Capture original sys.path + original_path = sys.path[:] + original_length = len(sys.path) + + # Create a test directory path that would be added + test_cwd = "apps/com.test.app/assets/" + + # Verify the test path is not already in sys.path + self.assertFalse(test_cwd in original_path, + f"Test path {test_cwd} should not be in sys.path initially") + + # Create a simple test script + test_script = ''' +import sys +# Just a simple script that does nothing +x = 42 +''' + + # Call execute_script with cwd parameter + # Note: This will fail because there's no Activity to start, + # but that's fine - we're testing the sys.path restoration + result = mpos.apps.execute_script( + test_script, + is_file=False, + cwd=test_cwd, + classname="NonExistentClass" + ) + + # After execution, sys.path should be restored + current_path = sys.path + current_length = len(sys.path) + + # Verify sys.path has been restored to original + self.assertEqual(current_length, original_length, + f"sys.path length should be restored. Original: {original_length}, Current: {current_length}") + + # Verify the test directory is not in sys.path anymore + self.assertFalse(test_cwd in current_path, + f"Test path {test_cwd} should not be in sys.path after execution. sys.path={current_path}") + + # Verify sys.path matches original + self.assertEqual(current_path, original_path, + f"sys.path should match original.\nOriginal: {original_path}\nCurrent: {current_path}") + + def test_syspath_not_affected_when_no_cwd(self): + """Test that sys.path is unchanged when cwd is None""" + import mpos.apps + + # Capture original sys.path + original_path = sys.path[:] + + test_script = ''' +x = 42 +''' + + # Call without cwd parameter + result = mpos.apps.execute_script( + test_script, + is_file=False, + cwd=None, + classname="NonExistentClass" + ) + + # sys.path should be unchanged + self.assertEqual(sys.path, original_path, + "sys.path should be unchanged when cwd is None")