From daa71250527dbf7898bc9925e60d05900e7d5da5 Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Tue, 24 Feb 2026 16:39:26 +0100 Subject: [PATCH] Rework AudioManager --- .../assets/music_player.py | 28 +- .../assets/sound_recorder.py | 69 +- .../lib/mpos/audio/__init__.py | 5 +- .../lib/mpos/audio/audiomanager.py | 1124 +++++++++-------- .../lib/mpos/audio/stream_record.py | 9 +- .../lib/mpos/audio/stream_record_adc.py | 5 + .../lib/mpos/audio/stream_wav.py | 89 +- .../lib/mpos/board/fri3d_2024.py | 45 +- .../lib/mpos/board/fri3d_2026.py | 24 +- internal_filesystem/lib/mpos/board/linux.py | 13 +- .../lib/mpos/board/m5stack_fire.py | 12 +- .../lib/mpos/board/odroid_go.py | 35 +- scripts/install.sh | 3 + tests/test_audiomanager.py | 156 +-- 14 files changed, 954 insertions(+), 663 deletions(-) diff --git a/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py b/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py index a5a979b4..402b3b2b 100644 --- a/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py +++ b/internal_filesystem/apps/com.micropythonos.musicplayer/assets/music_player.py @@ -103,19 +103,31 @@ class FullscreenPlayer(Activity): AudioManager.stop() time.sleep(0.1) - success = AudioManager.play_wav( - self._filename, - stream_type=AudioManager.STREAM_MUSIC, - on_complete=self.player_finished - ) - - if not success: - error_msg = "Error: Audio device unavailable or busy" + output = AudioManager.get_default_output() + if output is None: + error_msg = "Error: No audio output available" print(error_msg) self.update_ui_threadsafe_if_foreground( self._filename_label.set_text, error_msg ) + return + + try: + player = AudioManager.player( + file_path=self._filename, + stream_type=AudioManager.STREAM_MUSIC, + on_complete=self.player_finished, + output=output, + ) + player.start() + except Exception as exc: + error_msg = "Error: Audio device unavailable or busy" + print(f"{error_msg}: {exc}") + self.update_ui_threadsafe_if_foreground( + self._filename_label.set_text, + error_msg + ) def focus_obj(self, obj): obj.set_style_border_color(lv.theme_get_color_primary(None),lv.PART.MAIN) diff --git a/internal_filesystem/apps/com.micropythonos.soundrecorder/assets/sound_recorder.py b/internal_filesystem/apps/com.micropythonos.soundrecorder/assets/sound_recorder.py index 12aebc41..7981bf84 100644 --- a/internal_filesystem/apps/com.micropythonos.soundrecorder/assets/sound_recorder.py +++ b/internal_filesystem/apps/com.micropythonos.soundrecorder/assets/sound_recorder.py @@ -56,6 +56,8 @@ class SoundRecorder(Activity): _last_recording = None _timer_task = None _record_start_time = 0 + _recorder = None + _player = None def onCreate(self): screen = lv.obj() @@ -136,7 +138,8 @@ class SoundRecorder(Activity): def _update_status(self): """Update status label based on microphone availability.""" - if AudioManager.has_microphone(): + default_input = AudioManager.get_default_input() + if default_input is not None: self._status_label.set_text("Microphone ready") self._status_label.set_style_text_color(lv.color_hex(0x00AA00), lv.PART.MAIN) self._record_button.remove_flag(lv.obj.FLAG.HIDDEN) @@ -243,9 +246,10 @@ class SoundRecorder(Activity): def _start_recording(self): """Start recording audio.""" print("SoundRecorder: _start_recording called") - print(f"SoundRecorder: has_microphone() = {AudioManager.has_microphone()}") + default_input = AudioManager.get_default_input() + print(f"SoundRecorder: default input = {default_input}") - if not AudioManager.has_microphone(): + if default_input is None: print("SoundRecorder: No microphone available - aborting") return @@ -263,25 +267,32 @@ class SoundRecorder(Activity): return # Start recording - print(f"SoundRecorder: Calling AudioManager.record_wav()") + print(f"SoundRecorder: Calling AudioManager.recorder()") print(f" file_path: {file_path}") print(f" duration_ms: {self._current_max_duration_ms}") print(f" sample_rate: {self.SAMPLE_RATE}") - success = AudioManager.record_wav( - file_path=file_path, - duration_ms=self._current_max_duration_ms, - on_complete=self._on_recording_complete, - sample_rate=self.SAMPLE_RATE - ) + try: + self._recorder = AudioManager.recorder( + file_path=file_path, + duration_ms=self._current_max_duration_ms, + on_complete=self._on_recording_complete, + sample_rate=self.SAMPLE_RATE, + input=default_input, + ) + self._recorder.start() + success = True + except Exception as exc: + print(f"SoundRecorder: recorder start failed: {exc}") + success = False - print(f"SoundRecorder: record_wav returned: {success}") + print(f"SoundRecorder: recorder started: {success}") if success: self._is_recording = True self._record_start_time = time.ticks_ms() self._last_recording = file_path - print(f"SoundRecorder: Recording started successfully") + print("SoundRecorder: Recording started successfully") # Update UI self._record_button_label.set_text(lv.SYMBOL.STOP + " Stop") @@ -296,13 +307,15 @@ class SoundRecorder(Activity): # Start timer update self._start_timer_update() else: - print("SoundRecorder: record_wav failed!") + print("SoundRecorder: recorder failed!") self._status_label.set_text("Failed to start recording") self._status_label.set_style_text_color(lv.color_hex(0xAA0000), lv.PART.MAIN) def _stop_recording(self): """Stop recording audio.""" - AudioManager.stop() + if self._recorder: + self._recorder.stop() + self._recorder = None self._is_recording = False # Show "Saving..." status immediately (file finalization takes time on SD card) @@ -364,16 +377,30 @@ class SoundRecorder(Activity): """Handle play button click.""" if self._last_recording and not self._is_recording: # Stop any current playback - AudioManager.stop() + if self._player: + self._player.stop() time.sleep_ms(100) + output = AudioManager.get_default_output() + if output is None: + self._status_label.set_text("Playback failed") + self._status_label.set_style_text_color(lv.color_hex(0xAA0000), lv.PART.MAIN) + return + # Play the recording - success = AudioManager.play_wav( - self._last_recording, - stream_type=AudioManager.STREAM_MUSIC, - on_complete=self._on_playback_complete, - volume=100 - ) + try: + self._player = AudioManager.player( + file_path=self._last_recording, + stream_type=AudioManager.STREAM_MUSIC, + on_complete=self._on_playback_complete, + volume=100, + output=output, + ) + self._player.start() + success = True + except Exception as exc: + print(f"SoundRecorder: playback failed: {exc}") + success = False if success: self._status_label.set_text("Playing...") diff --git a/internal_filesystem/lib/mpos/audio/__init__.py b/internal_filesystem/lib/mpos/audio/__init__.py index d009cb77..f3b85cc8 100644 --- a/internal_filesystem/lib/mpos/audio/__init__.py +++ b/internal_filesystem/lib/mpos/audio/__init__.py @@ -1,5 +1,4 @@ # AudioManager - Centralized Audio Management Service for MicroPythonOS -# Android-inspired audio routing with priority-based audio focus -# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer, record_wav() -> I2S mic +# Registry-based audio routing with device descriptors and session control -from .audiomanager import AudioManager +from .audiomanager import AudioManager, Player, Recorder, StereoNotSupported diff --git a/internal_filesystem/lib/mpos/audio/audiomanager.py b/internal_filesystem/lib/mpos/audio/audiomanager.py index f8823328..7a8f631b 100644 --- a/internal_filesystem/lib/mpos/audio/audiomanager.py +++ b/internal_filesystem/lib/mpos/audio/audiomanager.py @@ -1,519 +1,657 @@ # AudioManager - Core Audio Management Service -# Centralized audio routing with priority-based audio focus (Android-inspired) -# Supports I2S (digital audio) and PWM buzzer (tones/ringtones) -# -# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer, record_wav() -> I2S mic -# Uses _thread for non-blocking background playback/recording (separate thread from UI) +# Registry-based audio routing with device descriptors and session control import _thread + from ..task_manager import TaskManager +class StereoNotSupported(Exception): + pass + + class AudioManager: """ - Centralized audio management service with priority-based audio focus. - Implements singleton pattern for single audio service instance. - + Centralized audio management service with device registry and session control. + Usage: from mpos import AudioManager - - # Direct class method calls (no .get() needed) - AudioManager.init(i2s_pins=pins, buzzer_instance=buzzer) - AudioManager.play_wav("music.wav", stream_type=AudioManager.STREAM_MUSIC) - AudioManager.set_volume(80) - volume = AudioManager.get_volume() - AudioManager.stop() + + AudioManager.add(AudioManager.Output(...)) + AudioManager.add(AudioManager.Input(...)) + + player = AudioManager.player(file_path="music.wav") + player.start() """ - - # Stream type constants (priority order: higher number = higher priority) - STREAM_MUSIC = 0 # Background music (lowest priority) - STREAM_NOTIFICATION = 1 # Notification sounds (medium priority) - STREAM_ALARM = 2 # Alarms/alerts (highest priority) - - _instance = None # Singleton instance - def __init__( - self, - i2s_pins=None, - buzzer_instance=None, - adc_mic_pin=None, - pre_playback=None, - post_playback=None, - ): - """ - Initialize AudioManager instance with optional hardware configuration. + STREAM_MUSIC = 0 + STREAM_NOTIFICATION = 1 + STREAM_ALARM = 2 - Args: - i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers (for I2S/WAV playback) - buzzer_instance: PWM instance for buzzer (for RTTTL playback) - adc_mic_pin: GPIO pin number for ADC microphone (for ADC recording) - pre_playback: Optional callback called before starting playback - post_playback: Optional callback called after stopping playback - """ - if AudioManager._instance: - # If instance exists, update configuration if provided - if i2s_pins: - AudioManager._instance._i2s_pins = i2s_pins - if buzzer_instance: - AudioManager._instance._buzzer_instance = buzzer_instance - if adc_mic_pin: - AudioManager._instance._adc_mic_pin = adc_mic_pin + _instance = None + + class Output: + def __init__( + self, + name, + kind, + channels=1, + i2s_pins=None, + buzzer_pin=None, + preferred_sample_rate=None, + ): + if kind not in ("i2s", "buzzer"): + raise ValueError("Output.kind must be 'i2s' or 'buzzer'") + if channels not in (1, 2): + raise ValueError("Output.channels must be 1 or 2") + + self.name = name + self.kind = kind + self.channels = channels + self.preferred_sample_rate = preferred_sample_rate + + if kind == "i2s": + if not i2s_pins: + raise ValueError("Output.i2s_pins required for i2s output") + self._validate_i2s_pins(i2s_pins) + self.i2s_pins = dict(i2s_pins) + self.buzzer_pin = None + else: + if buzzer_pin is None: + raise ValueError("Output.buzzer_pin required for buzzer output") + self.buzzer_pin = buzzer_pin + self.i2s_pins = None + + @staticmethod + def _validate_i2s_pins(i2s_pins): + allowed = {"sck", "ws", "sd", "mck"} + for key in i2s_pins: + if key not in allowed: + raise ValueError("Invalid i2s_pins key for output: %s" % key) + for key in ("ws", "sd"): + if key not in i2s_pins: + raise ValueError("i2s_pins must include '%s'" % key) + + def __repr__(self): + return "" % (self.name, self.kind) + + class Input: + def __init__( + self, + name, + kind, + channels=1, + i2s_pins=None, + adc_mic_pin=None, + preferred_sample_rate=None, + ): + if kind not in ("i2s", "adc"): + raise ValueError("Input.kind must be 'i2s' or 'adc'") + if channels != 1: + raise StereoNotSupported("Input channels=2 not supported yet") + + self.name = name + self.kind = kind + self.channels = channels + self.preferred_sample_rate = preferred_sample_rate + + if kind == "i2s": + if not i2s_pins: + raise ValueError("Input.i2s_pins required for i2s input") + self._validate_i2s_pins(i2s_pins) + self.i2s_pins = dict(i2s_pins) + self.adc_mic_pin = None + else: + if adc_mic_pin is None: + raise ValueError("Input.adc_mic_pin required for adc input") + self.adc_mic_pin = adc_mic_pin + self.i2s_pins = None + + @staticmethod + def _validate_i2s_pins(i2s_pins): + allowed = {"sck_in", "sck", "ws", "sd_in"} + for key in i2s_pins: + if key not in allowed: + raise ValueError("Invalid i2s_pins key for input: %s" % key) + for key in ("ws", "sd_in"): + if key not in i2s_pins: + raise ValueError("i2s_pins must include '%s'" % key) + + def __repr__(self): + return "" % (self.name, self.kind) + + def __init__(self): + if getattr(self, "_initialized", False): return - + AudioManager._instance = self + self._outputs = [] + self._inputs = [] + self._default_output = None + self._default_input = None + self._active_sessions = [] + self._volume = 50 + self._initialized = True - self._i2s_pins = i2s_pins # I2S pin configuration dict (created per-stream) - self._buzzer_instance = buzzer_instance # PWM buzzer instance - self._adc_mic_pin = adc_mic_pin # ADC microphone pin - self.pre_playback = pre_playback - self.post_playback = post_playback - - self._current_stream = None # Currently playing stream - self._current_recording = None # Currently recording stream - self._volume = 50 # System volume (0-100) - - # Build status message - capabilities = [] - if i2s_pins: - capabilities.append("I2S (WAV)") - if buzzer_instance: - capabilities.append("Buzzer (RTTTL)") - if adc_mic_pin: - capabilities.append(f"ADC Mic (Pin {adc_mic_pin})") - - if capabilities: - print(f"AudioManager initialized: {', '.join(capabilities)}") - else: - print("AudioManager initialized: No audio hardware") + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance @classmethod def get(cls): - """Get or create the singleton instance.""" if cls._instance is None: cls._instance = cls() return cls._instance - def has_i2s(self): - """Check if I2S audio is available for WAV playback.""" - return self._i2s_pins is not None - - def has_buzzer(self): - """Check if buzzer is available for RTTTL playback.""" - return self._buzzer_instance is not None - - def has_microphone(self): - """Check if microphone (I2S or ADC) is available for recording.""" - has_i2s_mic = self._i2s_pins is not None and 'sd_in' in self._i2s_pins - has_adc_mic = self._adc_mic_pin is not None - return has_i2s_mic or has_adc_mic - - def _check_audio_focus(self, stream_type): - """ - Check if a stream with the given type can start playback. - Implements priority-based audio focus (Android-inspired). - - Args: - stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) - - Returns: - bool: True if stream can start, False if rejected - """ - if not self._current_stream: - return True # No stream playing, OK to start - - if not self._current_stream.is_playing(): - return True # Current stream finished, OK to start - - # Check priority - if stream_type <= self._current_stream.stream_type: - print(f"AudioManager: Stream rejected (priority {stream_type} <= current {self._current_stream.stream_type})") - return False - - # Higher priority stream - interrupt current - print(f"AudioManager: Interrupting stream (priority {stream_type} > current {self._current_stream.stream_type})") - self._current_stream.stop() - return True - - def _playback_thread(self, stream): - """ - Thread function for audio playback. - Runs in a separate thread to avoid blocking the UI. - - Args: - stream: Stream instance (WAVStream or RTTTLStream) - """ - self._current_stream = stream - if self.pre_playback: - try: - self.pre_playback() - except Exception as e: - print(f"AudioManager: pre_playback callback error: {e}") - - try: - # Run synchronous playback in this thread - stream.play() - except Exception as e: - print(f"AudioManager: Playback error: {e}") - finally: - # Clear current stream - if self._current_stream == stream: - self._current_stream = None - - if self.post_playback: - try: - self.post_playback() - except Exception as e: - print(f"AudioManager: post_playback callback error: {e}") - - def play_wav(self, file_path, stream_type=None, volume=None, on_complete=None): - """ - Play WAV file via I2S. - - Args: - file_path: Path to WAV file (e.g., "M:/sdcard/music/song.wav") - stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) - volume: Override volume (0-100), or None to use system volume - on_complete: Callback function(message) called when playback finishes - - Returns: - bool: True if playback started, False if rejected or unavailable - """ - if stream_type is None: - stream_type = self.STREAM_MUSIC - - if not self._i2s_pins: - print("AudioManager: play_wav() failed - I2S not configured") - return False - - # Check audio focus - if not self._check_audio_focus(stream_type): - return False - - # Create stream and start playback in separate thread - try: - from mpos.audio.stream_wav import WAVStream - - stream = WAVStream( - file_path=file_path, - stream_type=stream_type, - volume=volume if volume is not None else self._volume, - i2s_pins=self._i2s_pins, - on_complete=on_complete - ) - - _thread.stack_size(TaskManager.good_stack_size()) - _thread.start_new_thread(self._playback_thread, (stream,)) - return True - - except Exception as e: - print(f"AudioManager: play_wav() failed: {e}") - return False - - def play_rtttl(self, rtttl_string, stream_type=None, volume=None, on_complete=None): - """ - Play RTTTL ringtone via buzzer. - - Args: - rtttl_string: RTTTL format string (e.g., "Nokia:d=4,o=5,b=225:8e6,8d6...") - stream_type: Stream type (STREAM_MUSIC, STREAM_NOTIFICATION, STREAM_ALARM) - volume: Override volume (0-100), or None to use system volume - on_complete: Callback function(message) called when playback finishes - - Returns: - bool: True if playback started, False if rejected or unavailable - """ - if stream_type is None: - stream_type = self.STREAM_NOTIFICATION - - if not self._buzzer_instance: - print("AudioManager: play_rtttl() failed - buzzer not configured") - return False - - # Check audio focus - if not self._check_audio_focus(stream_type): - return False - - # Create stream and start playback in separate thread - try: - from mpos.audio.stream_rtttl import RTTTLStream - - stream = RTTTLStream( - rtttl_string=rtttl_string, - stream_type=stream_type, - volume=volume if volume is not None else self._volume, - buzzer_instance=self._buzzer_instance, - on_complete=on_complete - ) - - _thread.stack_size(TaskManager.good_stack_size()) - _thread.start_new_thread(self._playback_thread, (stream,)) - return True - - except Exception as e: - print(f"AudioManager: play_rtttl() failed: {e}") - return False - - def _recording_thread(self, stream): - """ - Thread function for audio recording. - Runs in a separate thread to avoid blocking the UI. - - Args: - stream: RecordStream instance - """ - self._current_recording = stream - - try: - # Run synchronous recording in this thread - stream.record() - except Exception as e: - print(f"AudioManager: Recording error: {e}") - finally: - # Clear current recording - if self._current_recording == stream: - self._current_recording = None - - def record_wav(self, file_path, duration_ms=None, on_complete=None, sample_rate=16000): - """ - Record audio from I2S microphone to WAV file. - - Args: - file_path: Path to save WAV file (e.g., "data/recording.wav") - duration_ms: Recording duration in milliseconds (None = 60 seconds default) - on_complete: Callback function(message) when recording finishes - sample_rate: Sample rate in Hz (default 16000 for voice) - - Returns: - bool: True if recording started, False if rejected or unavailable - """ - print(f"AudioManager.record_wav() called") - print(f" file_path: {file_path}") - print(f" duration_ms: {duration_ms}") - print(f" sample_rate: {sample_rate}") - print(f" _i2s_pins: {self._i2s_pins}") - print(f" has_microphone(): {self.has_microphone()}") - - if not self.has_microphone(): - print("AudioManager: record_wav() failed - microphone not configured") - return False - - # Cannot record while playing (I2S can only be TX or RX, not both) - if self.is_playing(): - print("AudioManager: Cannot record while playing") - return False - - # Cannot start new recording while already recording - if self.is_recording(): - print("AudioManager: Already recording") - return False - - # Create stream and start recording in separate thread - try: - print("AudioManager: Importing RecordStream...") - from mpos.audio.stream_record import RecordStream - - print("AudioManager: Creating RecordStream instance...") - stream = RecordStream( - file_path=file_path, - duration_ms=duration_ms, - sample_rate=sample_rate, - i2s_pins=self._i2s_pins, - on_complete=on_complete - ) - - print("AudioManager: Starting recording thread...") - _thread.stack_size(TaskManager.good_stack_size()) - _thread.start_new_thread(self._recording_thread, (stream,)) - print("AudioManager: Recording thread started successfully") - return True - - except Exception as e: - import sys - print(f"AudioManager: record_wav() failed: {e}") - sys.print_exception(e) - return False - - def record_wav_adc(self, file_path, duration_ms=None, adc_pin=None, sample_rate=16000, - on_complete=None, **adc_config): - """ - Record audio from ADC using optimized C module to WAV file. - - Args: - file_path: Path to save WAV file (e.g., "data/recording.wav") - duration_ms: Recording duration in milliseconds (None = 60 seconds default) - adc_pin: GPIO pin for ADC input (default: configured pin or 1) - sample_rate: Target sample rate in Hz (default 16000 for voice) - on_complete: Callback function(message) when recording finishes - **adc_config: Additional ADC configuration - - Returns: - bool: True if recording started, False if rejected or unavailable - """ - # Use configured pin if not specified - if adc_pin is None: - adc_pin = self._adc_mic_pin - - # Fallback to default if still None - if adc_pin is None: - adc_pin = 1 # Default to GPIO1 (Fri3d 2026) - - print(f"AudioManager.record_wav_adc() called") - print(f" file_path: {file_path}") - print(f" duration_ms: {duration_ms}") - print(f" adc_pin: {adc_pin}") - print(f" sample_rate: {sample_rate}") - - # Cannot record while playing (I2S can only be TX or RX, not both) - if self.is_playing(): - print("AudioManager: Cannot record while playing") - return False - - # Cannot start new recording while already recording - if self.is_recording(): - print("AudioManager: Already recording") - return False - - # Create stream and start recording in separate thread - try: - print("AudioManager: Importing ADCRecordStream...") - from mpos.audio.stream_record_adc import ADCRecordStream - - print("AudioManager: Creating ADCRecordStream instance...") - stream = ADCRecordStream( - file_path=file_path, - duration_ms=duration_ms, - sample_rate=sample_rate, - adc_pin=adc_pin, - on_complete=on_complete, - **adc_config - ) - - print("AudioManager: Starting ADC recording thread...") - _thread.stack_size(TaskManager.good_stack_size()) - _thread.start_new_thread(self._recording_thread, (stream,)) - print("AudioManager: ADC recording thread started successfully") - return True - - except Exception as e: - import sys - print(f"AudioManager: record_wav_adc() failed: {e}") - sys.print_exception(e) - return False - - def stop(self): - """Stop current audio playback or recording.""" - stopped = False - - if self._current_stream: - self._current_stream.stop() - print("AudioManager: Playback stopped") - stopped = True - - if self._current_recording: - self._current_recording.stop() - print("AudioManager: Recording stopped") - stopped = True - - if not stopped: - print("AudioManager: No playback or recording to stop") - - def pause(self): - """ - Pause current audio playback (if supported by stream). - Note: Most streams don't support pause, only stop. - """ - if self._current_stream and hasattr(self._current_stream, 'pause'): - self._current_stream.pause() - print("AudioManager: Playback paused") - else: - print("AudioManager: Pause not supported or no playback active") - - def resume(self): - """ - Resume paused audio playback (if supported by stream). - Note: Most streams don't support resume, only play. - """ - if self._current_stream and hasattr(self._current_stream, 'resume'): - self._current_stream.resume() - print("AudioManager: Playback resumed") - else: - print("AudioManager: Resume not supported or no playback active") - - def set_volume(self, volume): - """ - Set system volume (affects new streams, not current playback). - - Args: - volume: Volume level (0-100) - """ - self._volume = max(0, min(100, volume)) - if self._current_stream: - self._current_stream.set_volume(self._volume) - - def get_volume(self): - """ - Get system volume. - - Returns: - int: Current system volume (0-100) - """ - return self._volume - - def is_playing(self): - """ - Check if audio is currently playing. - - Returns: - bool: True if playback active, False otherwise - """ - return self._current_stream is not None and self._current_stream.is_playing() - - def is_recording(self): - """ - Check if audio is currently being recorded. - - Returns: - bool: True if recording active, False otherwise - """ - return self._current_recording is not None and self._current_recording.is_recording() - -# ============================================================================ -# Class method forwarding to singleton instance -# -# Instead of writing each function like this: -# @classmethod -# def has_microphone(cls): -# instance = cls.get() -# return instance._i2s_pins is not None and 'sd_in' in instance._i2s_pins -# -# They can be written like this: -# def has_microphone(self): -# return self._i2s_pins is not None and 'sd_in' in self._i2s_pins -# -# ============================================================================ -# Store original instance methods before replacing them -_original_methods = {} -_methods_to_delegate = [ - 'play_wav', 'play_rtttl', 'record_wav', 'record_wav_adc', 'stop', 'pause', 'resume', - 'set_volume', 'get_volume', 'is_playing', 'is_recording', - 'has_i2s', 'has_buzzer', 'has_microphone' -] - -for method_name in _methods_to_delegate: - _original_methods[method_name] = getattr(AudioManager, method_name) - -# Helper to create delegating class methods -def _make_class_method(method_name): - """Create a class method that delegates to the singleton instance.""" - original_method = _original_methods[method_name] + @classmethod + def add(cls, device): + return cls.get()._add_device(device) + + def _add_device(self, device): + if isinstance(device, AudioManager.Output): + self._outputs.append(device) + if self._default_output is None: + self._default_output = device + return device + if isinstance(device, AudioManager.Input): + self._inputs.append(device) + if self._default_input is None: + self._default_input = device + return device + raise ValueError("Unsupported device type") @classmethod - def class_method(cls, *args, **kwargs): - instance = cls.get() - return original_method(instance, *args, **kwargs) - - return class_method + def get_outputs(cls): + return list(cls.get()._outputs) -# Attach class methods to AudioManager -for method_name in _methods_to_delegate: - setattr(AudioManager, method_name, _make_class_method(method_name)) + @classmethod + def get_inputs(cls): + return list(cls.get()._inputs) + + @classmethod + def get_default_output(cls): + return cls.get()._default_output + + @classmethod + def get_default_input(cls): + return cls.get()._default_input + + @classmethod + def set_default_output(cls, output): + cls.get()._default_output = output + + @classmethod + def set_default_input(cls, input_device): + cls.get()._default_input = input_device + + @classmethod + def set_volume(cls, volume): + cls.get()._volume = max(0, min(100, volume)) + + @classmethod + def get_volume(cls): + return cls.get()._volume + + @classmethod + def player( + cls, + file_path=None, + rtttl=None, + stream_type=None, + on_complete=None, + output=None, + sample_rate=None, + volume=None, + ): + return Player( + manager=cls.get(), + file_path=file_path, + rtttl=rtttl, + stream_type=stream_type, + on_complete=on_complete, + output=output, + sample_rate=sample_rate, + volume=volume, + ) + + @classmethod + def rtttl_player(cls, rtttl, **kwargs): + return cls.player(rtttl=rtttl, **kwargs) + + @classmethod + def recorder( + cls, + file_path, + input=None, + sample_rate=None, + on_complete=None, + duration_ms=None, + **adc_config + ): + return Recorder( + manager=cls.get(), + file_path=file_path, + input_device=input, + sample_rate=sample_rate, + on_complete=on_complete, + duration_ms=duration_ms, + adc_config=adc_config, + ) + + @classmethod + def record_wav_adc( + cls, + file_path, + duration_ms=None, + sample_rate=None, + adc_pin=None, + on_complete=None, + **adc_config + ): + manager = cls.get() + from mpos.audio.stream_record_adc import ADCRecordStream + + stream = ADCRecordStream( + file_path=file_path, + duration_ms=duration_ms, + sample_rate=sample_rate, + adc_pin=adc_pin, + on_complete=on_complete, + **adc_config, + ) + session = _ADCRecorderSession(manager, stream) + manager._resolve_conflicts(session) + manager._register_session(session) + + _thread.stack_size(TaskManager.good_stack_size()) + _thread.start_new_thread(session._record_thread, ()) + return True + + @classmethod + def stop(cls): + return cls.get()._stop_all() + + def _stop_all(self): + for session in list(self._active_sessions): + session.stop() + self._active_sessions = [] + + def _register_session(self, session): + self._active_sessions.append(session) + + def _session_finished(self, session): + if session in self._active_sessions: + self._active_sessions.remove(session) + + def _cleanup_inactive(self): + active = [] + for session in self._active_sessions: + if session.is_active(): + active.append(session) + self._active_sessions = active + + def _resolve_conflicts(self, new_session): + self._cleanup_inactive() + to_stop = [] + for session in self._active_sessions: + if self._sessions_conflict(session, new_session): + to_stop.append(session) + for session in to_stop: + session.stop() + if session in self._active_sessions: + self._active_sessions.remove(session) + + @staticmethod + def _pins_compatible(existing_signal, new_signal): + if existing_signal == new_signal and existing_signal in ("ws", "sck"): + return True + return False + + def _sessions_conflict(self, existing, new_session): + existing_pins = existing.pin_usage() + new_pins = new_session.pin_usage() + shared_clock = False + + for pin, new_signal in new_pins.items(): + if pin in existing_pins: + existing_signal = existing_pins[pin] + if self._pins_compatible(existing_signal, new_signal): + shared_clock = True + continue + return True + + if shared_clock: + if existing.sample_rate is None or new_session.sample_rate is None: + return True + if existing.sample_rate != new_session.sample_rate: + return True + + return False + + def _start_player(self, player): + if player.output is None: + player.output = self._default_output + if player.output is None: + raise ValueError("No output device registered") + + if player.stream_type is None: + player.stream_type = ( + self.STREAM_NOTIFICATION if player.rtttl else self.STREAM_MUSIC + ) + + if player.output.kind == "buzzer" and not player.rtttl: + raise ValueError("RTTTL string required for buzzer output") + if player.output.kind == "i2s" and not player.file_path: + raise ValueError("file_path required for i2s output") + + player.sample_rate = self._determine_player_rate(player) + + self._resolve_conflicts(player) + self._register_session(player) + + _thread.stack_size(TaskManager.good_stack_size()) + _thread.start_new_thread(player._play_thread, ()) + + def _start_recorder(self, recorder): + if recorder.input_device is None: + recorder.input_device = self._default_input + if recorder.input_device is None: + raise ValueError("No input device registered") + + recorder.sample_rate = self._determine_recorder_rate(recorder) + + self._resolve_conflicts(recorder) + self._register_session(recorder) + + _thread.stack_size(TaskManager.good_stack_size()) + _thread.start_new_thread(recorder._record_thread, ()) + + def _determine_player_rate(self, player): + if player.output.kind != "i2s": + return None + + preferred = player.sample_rate or player.output.preferred_sample_rate + + from mpos.audio.stream_wav import WAVStream + + info = WAVStream.get_wav_info(player.file_path) + original_rate = info["sample_rate"] + playback_rate, _ = WAVStream.compute_playback_rate(original_rate, preferred) + return playback_rate + + def _determine_recorder_rate(self, recorder): + if recorder.sample_rate: + return recorder.sample_rate + if recorder.input_device and recorder.input_device.preferred_sample_rate: + return recorder.input_device.preferred_sample_rate + return 16000 + + +class _ADCRecorderSession: + def __init__(self, manager, stream): + self._manager = manager + self._stream = stream + self.sample_rate = stream.sample_rate + + def start(self): + self._manager._resolve_conflicts(self) + self._manager._register_session(self) + + _thread.stack_size(TaskManager.good_stack_size()) + _thread.start_new_thread(self._record_thread, ()) + + def stop(self): + if self._stream: + self._stream.stop() + self._manager._session_finished(self) + + def is_active(self): + return self.is_recording() + + def is_recording(self): + return self._stream is not None and self._stream.is_recording() + + def pin_usage(self): + adc_pin = getattr(self._stream, "adc_pin", None) + if adc_pin is None: + return {} + return {adc_pin: "adc"} + + def _record_thread(self): + try: + self._stream.record() + finally: + self._manager._session_finished(self) + + +class Player: + def __init__( + self, + manager, + file_path=None, + rtttl=None, + stream_type=None, + on_complete=None, + output=None, + sample_rate=None, + volume=None, + ): + self._manager = manager + self.file_path = file_path + self.rtttl = rtttl + self.stream_type = stream_type + self.on_complete = on_complete + self.output = output + self.sample_rate = sample_rate + self.volume = volume + self._stream = None + self._buzzer = None + + def start(self): + self._manager._start_player(self) + + def stop(self): + if self._stream: + self._stream.stop() + if self._buzzer: + try: + self._buzzer.deinit() + except Exception: + pass + self._manager._session_finished(self) + + def pause(self): + if self._stream and hasattr(self._stream, "pause"): + self._stream.pause() + + def resume(self): + if self._stream and hasattr(self._stream, "resume"): + self._stream.resume() + + def is_active(self): + return self.is_playing() + + def is_playing(self): + return self._stream is not None and self._stream.is_playing() + + def get_progress_percent(self): + if self._stream and hasattr(self._stream, "get_progress_percent"): + return self._stream.get_progress_percent() + return None + + def get_progress_ms(self): + if self._stream and hasattr(self._stream, "get_progress_ms"): + return self._stream.get_progress_ms() + return None + + def get_duration_ms(self): + if self._stream and hasattr(self._stream, "get_duration_ms"): + return self._stream.get_duration_ms() + return None + + def pin_usage(self): + if not self.output: + return {} + if self.output.kind == "buzzer": + return {self.output.buzzer_pin: "buzzer"} + if self.output.kind == "i2s": + return _pin_map_i2s_output(self.output.i2s_pins) + return {} + + def _play_thread(self): + try: + if self.output.kind == "buzzer": + self._play_rtttl() + else: + self._play_wav() + finally: + if self._buzzer: + try: + self._buzzer.deinit() + except Exception: + pass + self._manager._session_finished(self) + + def _play_rtttl(self): + from mpos.audio.stream_rtttl import RTTTLStream + from machine import Pin, PWM + + self._buzzer = PWM(Pin(self.output.buzzer_pin, Pin.OUT)) + self._buzzer.duty_u16(0) + + self._stream = RTTTLStream( + rtttl_string=self.rtttl, + stream_type=self.stream_type, + volume=self.volume if self.volume is not None else self._manager._volume, + buzzer_instance=self._buzzer, + on_complete=self.on_complete, + ) + self._stream.play() + + def _play_wav(self): + from mpos.audio.stream_wav import WAVStream + + self._stream = WAVStream( + file_path=self.file_path, + stream_type=self.stream_type, + volume=self.volume if self.volume is not None else self._manager._volume, + i2s_pins=self.output.i2s_pins, + on_complete=self.on_complete, + requested_sample_rate=self.sample_rate, + ) + self._stream.play() + + +class Recorder: + def __init__( + self, + manager, + file_path, + input_device=None, + sample_rate=None, + on_complete=None, + duration_ms=None, + adc_config=None, + ): + self._manager = manager + self.file_path = file_path + self.input_device = input_device + self.sample_rate = sample_rate + self.on_complete = on_complete + self.duration_ms = duration_ms + self.adc_config = adc_config or {} + self._stream = None + + def start(self): + self._manager._start_recorder(self) + + def stop(self): + if self._stream: + self._stream.stop() + self._manager._session_finished(self) + + def pause(self): + if self._stream and hasattr(self._stream, "pause"): + self._stream.pause() + + def resume(self): + if self._stream and hasattr(self._stream, "resume"): + self._stream.resume() + + def is_active(self): + return self.is_recording() + + def is_recording(self): + return self._stream is not None and self._stream.is_recording() + + def get_duration_ms(self): + if self._stream and hasattr(self._stream, "get_duration_ms"): + return self._stream.get_duration_ms() + if self._stream and hasattr(self._stream, "get_elapsed_ms"): + return self._stream.get_elapsed_ms() + return None + + def pin_usage(self): + if not self.input_device: + return {} + if self.input_device.kind == "adc": + return {self.input_device.adc_mic_pin: "adc"} + if self.input_device.kind == "i2s": + return _pin_map_i2s_input(self.input_device.i2s_pins) + return {} + + def _record_thread(self): + try: + if self.input_device.kind == "adc": + self._record_adc() + else: + self._record_i2s() + finally: + self._manager._session_finished(self) + + def _record_i2s(self): + from mpos.audio.stream_record import RecordStream + + self._stream = RecordStream( + file_path=self.file_path, + duration_ms=self.duration_ms, + sample_rate=self.sample_rate, + i2s_pins=self.input_device.i2s_pins, + on_complete=self.on_complete, + ) + self._stream.record() + + def _record_adc(self): + from mpos.audio.stream_record_adc import ADCRecordStream + + self._stream = ADCRecordStream( + file_path=self.file_path, + duration_ms=self.duration_ms, + sample_rate=self.sample_rate, + adc_pin=self.input_device.adc_mic_pin, + on_complete=self.on_complete, + **self.adc_config, + ) + self._stream.record() + + +def _pin_map_i2s_output(i2s_pins): + pins = {} + if i2s_pins.get("sck") is not None: + pins[i2s_pins["sck"]] = "sck" + pins[i2s_pins["ws"]] = "ws" + pins[i2s_pins["sd"]] = "sd" + if i2s_pins.get("mck") is not None: + pins[i2s_pins["mck"]] = "mck" + return pins + + +def _pin_map_i2s_input(i2s_pins): + pins = {} + sck_pin = i2s_pins.get("sck_in", i2s_pins.get("sck")) + if sck_pin is not None: + pins[sck_pin] = "sck" + pins[i2s_pins["ws"]] = "ws" + pins[i2s_pins["sd_in"]] = "sd_in" + return pins diff --git a/internal_filesystem/lib/mpos/audio/stream_record.py b/internal_filesystem/lib/mpos/audio/stream_record.py index d12a580e..ff9f9034 100644 --- a/internal_filesystem/lib/mpos/audio/stream_record.py +++ b/internal_filesystem/lib/mpos/audio/stream_record.py @@ -68,6 +68,7 @@ class RecordStream: self._is_recording = False self._i2s = None self._bytes_recorded = 0 + self._start_time_ms = 0 def is_recording(self): """Check if stream is currently recording.""" @@ -203,6 +204,7 @@ class RecordStream: self._is_recording = True self._bytes_recorded = 0 + self._start_time_ms = time.ticks_ms() try: # Ensure directory exists @@ -346,4 +348,9 @@ class RecordStream: if self._i2s: self._i2s.deinit() self._i2s = None - print(f"RecordStream: Recording thread finished") \ No newline at end of file + print(f"RecordStream: Recording thread finished") + + def get_duration_ms(self): + if self._start_time_ms <= 0: + return 0 + return time.ticks_diff(time.ticks_ms(), self._start_time_ms) \ No newline at end of file diff --git a/internal_filesystem/lib/mpos/audio/stream_record_adc.py b/internal_filesystem/lib/mpos/audio/stream_record_adc.py index 1cdaf87d..d876591d 100644 --- a/internal_filesystem/lib/mpos/audio/stream_record_adc.py +++ b/internal_filesystem/lib/mpos/audio/stream_record_adc.py @@ -354,3 +354,8 @@ class ADCRecordStream: finally: self._is_recording = False print(f"ADCRecordStream: Recording thread finished") + + def get_duration_ms(self): + if self._start_time_ms <= 0: + return 0 + return time.ticks_diff(time.ticks_ms(), self._start_time_ms) diff --git a/internal_filesystem/lib/mpos/audio/stream_wav.py b/internal_filesystem/lib/mpos/audio/stream_wav.py index e84f254e..2aa2ce53 100644 --- a/internal_filesystem/lib/mpos/audio/stream_wav.py +++ b/internal_filesystem/lib/mpos/audio/stream_wav.py @@ -147,7 +147,15 @@ class WAVStream: Supports 8/16/24/32-bit PCM, mono and stereo, auto-upsampling to >=22050 Hz. """ - def __init__(self, file_path, stream_type, volume, i2s_pins, on_complete): + def __init__( + self, + file_path, + stream_type, + volume, + i2s_pins, + on_complete, + requested_sample_rate=None, + ): """ Initialize WAV stream. @@ -157,15 +165,25 @@ class WAVStream: volume: Volume level (0-100) i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers on_complete: Callback function(message) when playback finishes + requested_sample_rate: Optional negotiated sample rate for shared clocks """ self.file_path = file_path self.stream_type = stream_type self.volume = volume self.i2s_pins = i2s_pins self.on_complete = on_complete + self.requested_sample_rate = requested_sample_rate self._keep_running = True self._is_playing = False self._i2s = None + self._progress_samples = 0 + self._total_samples = 0 + self._duration_ms = None + self._playback_rate = None + self._original_rate = None + self._channels = None + self._bits_per_sample = None + self._data_size = None def is_playing(self): """Check if stream is currently playing.""" @@ -175,6 +193,19 @@ class WAVStream: """Stop playback.""" self._keep_running = False + def get_progress_percent(self): + if self._total_samples <= 0: + return None + return int((self._progress_samples / self._total_samples) * 100) + + def get_progress_ms(self): + if self._playback_rate: + return int((self._progress_samples / self._playback_rate) * 1000) + return None + + def get_duration_ms(self): + return self._duration_ms + # ---------------------------------------------------------------------- # WAV header parser - returns bit-depth and format info # ---------------------------------------------------------------------- @@ -235,6 +266,37 @@ class WAVStream: raise ValueError("No 'data' chunk found") + # ---------------------------------------------------------------------- + # WAV info helpers + # ---------------------------------------------------------------------- + @staticmethod + def get_wav_info(file_path): + with open(file_path, 'rb') as f: + data_start, data_size, sample_rate, channels, bits_per_sample = ( + WAVStream._find_data_chunk(f) + ) + return { + "data_start": data_start, + "data_size": data_size, + "sample_rate": sample_rate, + "channels": channels, + "bits_per_sample": bits_per_sample, + } + + @staticmethod + def compute_playback_rate(original_rate, requested_rate=None): + if requested_rate: + if requested_rate <= original_rate: + return original_rate, 1 + upsample_factor = (requested_rate + original_rate - 1) // original_rate + return original_rate * upsample_factor, upsample_factor + + target_rate = 22050 + if original_rate >= target_rate: + return original_rate, 1 + upsample_factor = (target_rate + original_rate - 1) // original_rate + return original_rate * upsample_factor, upsample_factor + # ---------------------------------------------------------------------- # Bit depth conversion functions # ---------------------------------------------------------------------- @@ -327,14 +389,17 @@ class WAVStream: data_start, data_size, original_rate, channels, bits_per_sample = \ self._find_data_chunk(f) - # Decide playback rate (force >=22050 Hz) - but why?! the DAC should support down to 8kHz! - target_rate = 22050 # slower is faster (less data) - if original_rate >= target_rate: - playback_rate = original_rate - upsample_factor = 1 - else: - upsample_factor = (target_rate + original_rate - 1) // original_rate - playback_rate = original_rate * upsample_factor + self._original_rate = original_rate + self._channels = channels + self._bits_per_sample = bits_per_sample + self._data_size = data_size + + playback_rate, upsample_factor = self.compute_playback_rate( + original_rate, + self.requested_sample_rate, + ) + + self._playback_rate = playback_rate print(f"WAVStream: {original_rate} Hz, {bits_per_sample}-bit, {channels}-ch") print(f"WAVStream: Playback at {playback_rate} Hz (factor {upsample_factor})") @@ -342,6 +407,11 @@ class WAVStream: if data_size > file_size - data_start: data_size = file_size - data_start + bytes_per_sample = (bits_per_sample // 8) * channels + if bytes_per_sample > 0: + self._total_samples = data_size // bytes_per_sample + self._duration_ms = int((self._total_samples / original_rate) * 1000) + # Initialize I2S (always 16-bit output) try: i2s_format = machine.I2S.MONO if channels == 1 else machine.I2S.STEREO @@ -445,6 +515,7 @@ class WAVStream: time.sleep(num_samples / playback_rate) total_original += to_read + self._progress_samples = total_original // bytes_per_original_sample print(f"WAVStream: Finished playing {self.file_path}") if self.on_complete: diff --git a/internal_filesystem/lib/mpos/board/fri3d_2024.py b/internal_filesystem/lib/mpos/board/fri3d_2024.py index fb0ed56f..71f9b4ff 100644 --- a/internal_filesystem/lib/mpos/board/fri3d_2024.py +++ b/internal_filesystem/lib/mpos/board/fri3d_2024.py @@ -292,28 +292,47 @@ import mpos.sdcard mpos.sdcard.init(spi_bus=spi_bus, cs_pin=14) # === AUDIO HARDWARE === -from machine import PWM, Pin from mpos import AudioManager -# Initialize buzzer (GPIO 46) -buzzer = PWM(Pin(46), freq=550, duty=0) - # I2S pin configuration for audio output (DAC) and input (microphone) # Note: I2S is created per-stream, not at boot (only one instance can exist) # The DAC uses BCK (bit clock) on GPIO 2, while the microphone uses SCLK on GPIO 17 # See schematics: DAC has BCK=2, WS=47, SD=16; Microphone has SCLK=17, WS=47, DIN=15 -i2s_pins = { +i2s_output_pins = { 'ws': 47, # Word Select / LRCLK shared between DAC and mic (mandatory) - # Output (DAC/speaker) config 'sck': 2, # SCLK or BCLK - Bit Clock for DAC output (mandatory) 'sd': 16, # Serial Data OUT (speaker/DAC) - # Input (microphone) config +} + +i2s_input_pins = { + 'ws': 47, # Word Select / LRCLK shared between DAC and mic (mandatory) 'sck_in': 17, # SCLK - Serial Clock for microphone input 'sd_in': 15, # DIN - Serial Data IN (microphone) } -# Initialize AudioManager with I2S and buzzer -AudioManager(i2s_pins=i2s_pins, buzzer_instance=buzzer) +speaker_output = AudioManager.add( + AudioManager.Output( + name="speaker", + kind="i2s", + i2s_pins=i2s_output_pins, + ) +) + +buzzer_output = AudioManager.add( + AudioManager.Output( + name="buzzer", + kind="buzzer", + buzzer_pin=46, + ) +) + +mic_input = AudioManager.add( + AudioManager.Input( + name="mic", + kind="i2s", + i2s_pins=i2s_input_pins, + ) +) # === SENSOR HARDWARE === from mpos import SensorManager @@ -344,7 +363,13 @@ def startup_wow_effect(): #startup_jingle = "ShortBeeps:d=32,o=5,b=320:c6,c7" # Start the jingle - AudioManager.play_rtttl(startup_jingle,stream_type=AudioManager.STREAM_NOTIFICATION,volume=60) + player = AudioManager.player( + rtttl=startup_jingle, + stream_type=AudioManager.STREAM_NOTIFICATION, + volume=60, + output=buzzer_output, + ) + player.start() # Rainbow colors for the 5 LEDs rainbow = [ diff --git a/internal_filesystem/lib/mpos/board/fri3d_2026.py b/internal_filesystem/lib/mpos/board/fri3d_2026.py index 90950c1d..e722344a 100644 --- a/internal_filesystem/lib/mpos/board/fri3d_2026.py +++ b/internal_filesystem/lib/mpos/board/fri3d_2026.py @@ -191,7 +191,6 @@ import mpos.sdcard mpos.sdcard.init(spi_bus=spi_bus, cs_pin=14) # === AUDIO HARDWARE === -from machine import PWM, Pin # Initialize buzzer: now sits on PC14/CC1 of the CH32X035GxUx so needs custom code #buzzer = PWM(Pin(46), freq=550, duty=0) @@ -213,18 +212,31 @@ from machine import PWM, Pin # - try similar combinations: hss + cs, cm + hsm # - try cross combinations: hss + cm, cs + hsm -i2s_pins = { +i2s_output_pins = { 'ws': 47, # Word Select / LRCLK shared between DAC and mic (mandatory) - # Output (DAC/speaker) pins 'sd': 16, # Serial Data OUT (speaker/DAC) 'sck': 17, # SCLK aka BCLK (appears mandatory) BUT this pin is sck_in on the communicator 'mck': 2, # MCLK (mandatory) BUT this pin is sck on the communicator } -# Initialize AudioManager with I2S (buzzer TODO) -# ADC microphone is on GPIO 1 from mpos import AudioManager -AudioManager(i2s_pins=i2s_pins, adc_mic_pin=1) + +speaker_output = AudioManager.add( + AudioManager.Output( + name="speaker", + kind="i2s", + i2s_pins=i2s_output_pins, + ) +) + +# ADC microphone is on GPIO 1 +mic_input = AudioManager.add( + AudioManager.Input( + name="mic", + kind="adc", + adc_mic_pin=1, + ) +) # === SENSOR HARDWARE === from mpos import SensorManager diff --git a/internal_filesystem/lib/mpos/board/linux.py b/internal_filesystem/lib/mpos/board/linux.py index a1f464b5..43aab645 100644 --- a/internal_filesystem/lib/mpos/board/linux.py +++ b/internal_filesystem/lib/mpos/board/linux.py @@ -107,15 +107,22 @@ from mpos import AudioManager # Desktop builds have no real audio hardware, but we simulate microphone # recording with a 440Hz sine wave for testing WAV file generation -# The i2s_pins dict with 'sd_in' enables has_microphone() to return True -i2s_pins = { +# The i2s_pins dict with 'sd_in' enables microphone simulation +AudioManager() + +output_i2s_pins = { 'sck': 0, # Simulated - not used on desktop 'ws': 0, # Simulated - not used on desktop 'sd': 0, # Simulated - not used on desktop +} +input_i2s_pins = { 'sck_in': 0, # Simulated - not used on desktop + 'ws': 0, # Simulated - not used on desktop 'sd_in': 0, # Simulated - enables microphone simulation } -AudioManager(i2s_pins=i2s_pins) + +AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=output_i2s_pins)) +AudioManager.add(AudioManager.Input("mic", "i2s", i2s_pins=input_i2s_pins)) # === LED HARDWARE === # Note: Desktop builds have no LED hardware diff --git a/internal_filesystem/lib/mpos/board/m5stack_fire.py b/internal_filesystem/lib/mpos/board/m5stack_fire.py index 9aac9953..1292b6e3 100644 --- a/internal_filesystem/lib/mpos/board/m5stack_fire.py +++ b/internal_filesystem/lib/mpos/board/m5stack_fire.py @@ -48,10 +48,16 @@ MPU6886_I2C_FREQ = const(400000) print("m5stack_fire.py init buzzer") buzzer = PWM(Pin(BUZZER_PIN, Pin.OUT, value=1), duty=5) -AudioManager(i2s_pins=None, buzzer_instance=buzzer) +AudioManager() +AudioManager.add(AudioManager.Output("buzzer", "buzzer", buzzer_pin=BUZZER_PIN)) AudioManager.set_volume(40) -AudioManager.play_rtttl("Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6") -while AudioManager.is_playing(): + +player = AudioManager.player( + rtttl="Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6", + stream_type=AudioManager.STREAM_NOTIFICATION, +) +player.start() +while player.is_playing(): time.sleep(0.1) diff --git a/internal_filesystem/lib/mpos/board/odroid_go.py b/internal_filesystem/lib/mpos/board/odroid_go.py index e9e87dca..4aa1d946 100644 --- a/internal_filesystem/lib/mpos/board/odroid_go.py +++ b/internal_filesystem/lib/mpos/board/odroid_go.py @@ -61,7 +61,6 @@ blue_led = machine.Pin(LED_BLUE, machine.Pin.OUT) blue_led.on() print("odroid_go.py init buzzer") -buzzer = PWM(Pin(BUZZER_PIN, Pin.OUT, value=1), duty=5) class BuzzerCallbacks: @@ -80,17 +79,23 @@ class BuzzerCallbacks: buzzer_callbacks = BuzzerCallbacks() -AudioManager( - i2s_pins=None, - buzzer_instance=buzzer, - # The buzzer makes noise when it's unmuted, to avoid this we - # mute it after playback and vice versa unmute it before playback: - pre_playback=buzzer_callbacks.unmute, - post_playback=buzzer_callbacks.mute, + +buzzer_output = AudioManager.add( + AudioManager.Output( + name="buzzer", + kind="buzzer", + buzzer_pin=BUZZER_PIN, + ) ) AudioManager.set_volume(40) -AudioManager.play_rtttl("Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6") -while AudioManager.is_playing(): +player = AudioManager.player( + rtttl="Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6", + output=buzzer_output, + on_complete=buzzer_callbacks.mute, +) +buzzer_callbacks.unmute() +player.start() +while player.is_playing(): time.sleep(0.1) print("odroid_go.py machine.SPI.Bus() initialization") @@ -232,12 +237,16 @@ def input_callback(indev, data): elif button_volume.value() == 0: print("Volume button pressed -> reset") blue_led.on() - AudioManager.play_rtttl( - "Outro:o=5,d=32,b=160,b=160:c6,b,a,g,f,e,d,c", + player = AudioManager.player( + rtttl="Outro:o=5,d=32,b=160,b=160:c6,b,a,g,f,e,d,c", stream_type=AudioManager.STREAM_ALARM, volume=40, + output=buzzer_output, + on_complete=buzzer_callbacks.mute, ) - while AudioManager.is_playing(): + buzzer_callbacks.unmute() + player.start() + while player.is_playing(): time.sleep(0.1) machine.reset() elif button_select.value() == 0: diff --git a/scripts/install.sh b/scripts/install.sh index ff204723..4e2bb6ff 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -68,6 +68,9 @@ $mpremote fs mkdir :/data/com.micropythonos.system.wifiservice $mpremote fs cp ../internal_filesystem_excluded/data/com.micropythonos.system.wifiservice/config.json :/data/com.micropythonos.system.wifiservice/ $mpremote fs mkdir :/apps +$mpremote fs cp -r apps/com.micropythonos.musicplayer :/apps/ +$mpremote fs cp -r apps/com.micropythonos.soundrecorder :/apps/ +exit 1 $mpremote fs cp -r apps/com.micropythonos.* :/apps/ find apps/ -maxdepth 1 -type l | while read symlink; do if echo $symlink | grep quasiboats; then diff --git a/tests/test_audiomanager.py b/tests/test_audiomanager.py index 83c2c646..43c664fb 100644 --- a/tests/test_audiomanager.py +++ b/tests/test_audiomanager.py @@ -5,8 +5,6 @@ import sys # Import centralized mocks from mpos.testing import ( MockMachine, - MockPWM, - MockPin, MockThread, inject_mocks, ) @@ -26,17 +24,16 @@ class TestAudioManager(unittest.TestCase): def setUp(self): """Initialize AudioManager before each test.""" - self.buzzer = MockPWM(MockPin(46)) + self.buzzer_pin = 46 self.i2s_pins = {'sck': 2, 'ws': 47, 'sd': 16} # Reset singleton instance for each test AudioManager._instance = None - AudioManager( - i2s_pins=self.i2s_pins, - buzzer_instance=self.buzzer - ) - + AudioManager() + AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=self.i2s_pins)) + AudioManager.add(AudioManager.Output("buzzer", "buzzer", buzzer_pin=self.buzzer_pin)) + # Reset volume to default after creating instance AudioManager.set_volume(70) @@ -47,32 +44,22 @@ class TestAudioManager(unittest.TestCase): def test_initialization(self): """Test that AudioManager initializes correctly.""" am = AudioManager.get() - self.assertEqual(am._i2s_pins, self.i2s_pins) - self.assertEqual(am._buzzer_instance, self.buzzer) + self.assertEqual(len(am._outputs), 2) + self.assertEqual(am._outputs[0].i2s_pins, self.i2s_pins) + self.assertEqual(am._outputs[1].buzzer_pin, self.buzzer_pin) - def test_has_i2s(self): - """Test has_i2s() returns correct value.""" - # With I2S configured - AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=None) - self.assertTrue(AudioManager.has_i2s()) - - # Without I2S configured - AudioManager._instance = None - AudioManager(i2s_pins=None, buzzer_instance=self.buzzer) - self.assertFalse(AudioManager.has_i2s()) + def test_get_outputs(self): + """Test that get_outputs() returns configured outputs.""" + outputs = AudioManager.get_outputs() + self.assertEqual(len(outputs), 2) + self.assertEqual(outputs[0].kind, "i2s") + self.assertEqual(outputs[1].kind, "buzzer") - def test_has_buzzer(self): - """Test has_buzzer() returns correct value.""" - # With buzzer configured - AudioManager._instance = None - AudioManager(i2s_pins=None, buzzer_instance=self.buzzer) - self.assertTrue(AudioManager.has_buzzer()) - - # Without buzzer configured - AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=None) - self.assertFalse(AudioManager.has_buzzer()) + def test_default_output(self): + """Test default output selection.""" + default_output = AudioManager.get_default_output() + self.assertIsNotNone(default_output) + self.assertEqual(default_output.kind, "i2s") def test_stream_types(self): """Test stream type constants and priority order.""" @@ -101,60 +88,53 @@ class TestAudioManager(unittest.TestCase): """Test that no hardware rejects all playback requests.""" # Re-initialize with no hardware AudioManager._instance = None - AudioManager(i2s_pins=None, buzzer_instance=None) + AudioManager() - # WAV should be rejected (no I2S) - result = AudioManager.play_wav("test.wav") - self.assertFalse(result) + with self.assertRaises(ValueError): + AudioManager.player(file_path="test.wav").start() - # RTTTL should be rejected (no buzzer) - result = AudioManager.play_rtttl("Test:d=4,o=5,b=120:c") - self.assertFalse(result) + with self.assertRaises(ValueError): + AudioManager.player(rtttl="Test:d=4,o=5,b=120:c").start() def test_i2s_only_rejects_rtttl(self): """Test that I2S-only config rejects buzzer playback.""" # Re-initialize with I2S only AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=None) + AudioManager() + AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=self.i2s_pins)) - # RTTTL should be rejected (no buzzer) - result = AudioManager.play_rtttl("Test:d=4,o=5,b=120:c") - self.assertFalse(result) + with self.assertRaises(ValueError): + AudioManager.player(rtttl="Test:d=4,o=5,b=120:c").start() def test_buzzer_only_rejects_wav(self): """Test that buzzer-only config rejects I2S playback.""" # Re-initialize with buzzer only AudioManager._instance = None - AudioManager(i2s_pins=None, buzzer_instance=self.buzzer) + AudioManager() + AudioManager.add(AudioManager.Output("buzzer", "buzzer", buzzer_pin=self.buzzer_pin)) - # WAV should be rejected (no I2S) - result = AudioManager.play_wav("test.wav") - self.assertFalse(result) + with self.assertRaises(ValueError): + AudioManager.player(file_path="test.wav").start() def test_is_playing_initially_false(self): """Test that is_playing() returns False initially.""" # Reset to ensure clean state AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=self.buzzer) - self.assertFalse(AudioManager.is_playing()) + AudioManager() + AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=self.i2s_pins)) + self.assertFalse(AudioManager.player(file_path="test.wav").is_playing()) def test_stop_with_no_playback(self): """Test that stop() can be called when nothing is playing.""" # Should not raise exception AudioManager.stop() - self.assertFalse(AudioManager.is_playing()) - - def test_audio_focus_check_no_current_stream(self): - """Test audio focus allows playback when no stream is active.""" - am = AudioManager.get() - result = am._check_audio_focus(AudioManager.STREAM_MUSIC) - self.assertTrue(result) def test_volume_default_value(self): """Test that default volume is reasonable.""" - # After init, volume should be at default (70) - AudioManager(i2s_pins=None, buzzer_instance=None) - self.assertEqual(AudioManager.get_volume(), 70) + # After init, volume should be at default (50) + AudioManager._instance = None + AudioManager() + self.assertEqual(AudioManager.get_volume(), 50) class TestAudioManagerRecording(unittest.TestCase): @@ -162,20 +142,15 @@ class TestAudioManagerRecording(unittest.TestCase): def setUp(self): """Initialize AudioManager with microphone before each test.""" - self.buzzer = MockPWM(MockPin(46)) # I2S pins with microphone input - self.i2s_pins_with_mic = {'sck': 2, 'ws': 47, 'sd': 16, 'sd_in': 15} - # I2S pins without microphone input - self.i2s_pins_no_mic = {'sck': 2, 'ws': 47, 'sd': 16} + self.i2s_pins_with_mic = {'sck': 2, 'ws': 47, 'sd_in': 15} # Reset singleton instance for each test AudioManager._instance = None - AudioManager( - i2s_pins=self.i2s_pins_with_mic, - buzzer_instance=self.buzzer - ) - + AudioManager() + AudioManager.add(AudioManager.Input("mic", "i2s", i2s_pins=self.i2s_pins_with_mic)) + # Reset volume to default after creating instance AudioManager.set_volume(70) @@ -183,43 +158,38 @@ class TestAudioManagerRecording(unittest.TestCase): """Clean up after each test.""" AudioManager.stop() - def test_has_microphone_with_sd_in(self): - """Test has_microphone() returns True when sd_in pin is configured.""" - AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins_with_mic, buzzer_instance=None) - self.assertTrue(AudioManager.has_microphone()) + def test_get_inputs(self): + """Test get_inputs() returns configured inputs.""" + inputs = AudioManager.get_inputs() + self.assertEqual(len(inputs), 1) + self.assertEqual(inputs[0].kind, "i2s") - def test_has_microphone_without_sd_in(self): - """Test has_microphone() returns False when sd_in pin is not configured.""" - AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None) - self.assertFalse(AudioManager.has_microphone()) - - def test_has_microphone_no_i2s(self): - """Test has_microphone() returns False when no I2S is configured.""" - AudioManager._instance = None - AudioManager(i2s_pins=None, buzzer_instance=self.buzzer) - self.assertFalse(AudioManager.has_microphone()) + def test_default_input(self): + """Test default input selection.""" + default_input = AudioManager.get_default_input() + self.assertIsNotNone(default_input) + self.assertEqual(default_input.kind, "i2s") def test_is_recording_initially_false(self): """Test that is_recording() returns False initially.""" - self.assertFalse(AudioManager.is_recording()) + recorder = AudioManager.recorder(file_path="test.wav") + self.assertFalse(recorder.is_recording()) def test_record_wav_no_microphone(self): - """Test that record_wav() fails when no microphone is configured.""" + """Test that recorder() fails when no microphone is configured.""" AudioManager._instance = None - AudioManager(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None) - result = AudioManager.record_wav("test.wav") - self.assertFalse(result, "record_wav() fails when no microphone is configured") + AudioManager() + with self.assertRaises(ValueError): + AudioManager.recorder(file_path="test.wav").start() def test_record_wav_no_i2s(self): AudioManager._instance = None - AudioManager(i2s_pins=None, buzzer_instance=self.buzzer) - result = AudioManager.record_wav("test.wav") - self.assertFalse(result, "record_wav() should fail when no I2S is configured") + AudioManager() + AudioManager.add(AudioManager.Input("mic", "adc", adc_mic_pin=4)) + recorder = AudioManager.recorder(file_path="test.wav") + self.assertFalse(recorder.is_recording()) def test_stop_with_no_recording(self): """Test that stop() can be called when nothing is recording.""" # Should not raise exception AudioManager.stop() - self.assertFalse(AudioManager.is_recording())