Rework AudioManager

This commit is contained in:
Thomas Farstrike
2026-02-24 16:39:26 +01:00
parent 2f1ee282c3
commit daa7125052
14 changed files with 954 additions and 663 deletions
@@ -103,19 +103,31 @@ class FullscreenPlayer(Activity):
AudioManager.stop()
time.sleep(0.1)
success = AudioManager.play_wav(
self._filename,
stream_type=AudioManager.STREAM_MUSIC,
on_complete=self.player_finished
)
if not success:
error_msg = "Error: Audio device unavailable or busy"
output = AudioManager.get_default_output()
if output is None:
error_msg = "Error: No audio output available"
print(error_msg)
self.update_ui_threadsafe_if_foreground(
self._filename_label.set_text,
error_msg
)
return
try:
player = AudioManager.player(
file_path=self._filename,
stream_type=AudioManager.STREAM_MUSIC,
on_complete=self.player_finished,
output=output,
)
player.start()
except Exception as exc:
error_msg = "Error: Audio device unavailable or busy"
print(f"{error_msg}: {exc}")
self.update_ui_threadsafe_if_foreground(
self._filename_label.set_text,
error_msg
)
def focus_obj(self, obj):
obj.set_style_border_color(lv.theme_get_color_primary(None),lv.PART.MAIN)
@@ -56,6 +56,8 @@ class SoundRecorder(Activity):
_last_recording = None
_timer_task = None
_record_start_time = 0
_recorder = None
_player = None
def onCreate(self):
screen = lv.obj()
@@ -136,7 +138,8 @@ class SoundRecorder(Activity):
def _update_status(self):
"""Update status label based on microphone availability."""
if AudioManager.has_microphone():
default_input = AudioManager.get_default_input()
if default_input is not None:
self._status_label.set_text("Microphone ready")
self._status_label.set_style_text_color(lv.color_hex(0x00AA00), lv.PART.MAIN)
self._record_button.remove_flag(lv.obj.FLAG.HIDDEN)
@@ -243,9 +246,10 @@ class SoundRecorder(Activity):
def _start_recording(self):
"""Start recording audio."""
print("SoundRecorder: _start_recording called")
print(f"SoundRecorder: has_microphone() = {AudioManager.has_microphone()}")
default_input = AudioManager.get_default_input()
print(f"SoundRecorder: default input = {default_input}")
if not AudioManager.has_microphone():
if default_input is None:
print("SoundRecorder: No microphone available - aborting")
return
@@ -263,25 +267,32 @@ class SoundRecorder(Activity):
return
# Start recording
print(f"SoundRecorder: Calling AudioManager.record_wav()")
print(f"SoundRecorder: Calling AudioManager.recorder()")
print(f" file_path: {file_path}")
print(f" duration_ms: {self._current_max_duration_ms}")
print(f" sample_rate: {self.SAMPLE_RATE}")
success = AudioManager.record_wav(
file_path=file_path,
duration_ms=self._current_max_duration_ms,
on_complete=self._on_recording_complete,
sample_rate=self.SAMPLE_RATE
)
try:
self._recorder = AudioManager.recorder(
file_path=file_path,
duration_ms=self._current_max_duration_ms,
on_complete=self._on_recording_complete,
sample_rate=self.SAMPLE_RATE,
input=default_input,
)
self._recorder.start()
success = True
except Exception as exc:
print(f"SoundRecorder: recorder start failed: {exc}")
success = False
print(f"SoundRecorder: record_wav returned: {success}")
print(f"SoundRecorder: recorder started: {success}")
if success:
self._is_recording = True
self._record_start_time = time.ticks_ms()
self._last_recording = file_path
print(f"SoundRecorder: Recording started successfully")
print("SoundRecorder: Recording started successfully")
# Update UI
self._record_button_label.set_text(lv.SYMBOL.STOP + " Stop")
@@ -296,13 +307,15 @@ class SoundRecorder(Activity):
# Start timer update
self._start_timer_update()
else:
print("SoundRecorder: record_wav failed!")
print("SoundRecorder: recorder failed!")
self._status_label.set_text("Failed to start recording")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), lv.PART.MAIN)
def _stop_recording(self):
"""Stop recording audio."""
AudioManager.stop()
if self._recorder:
self._recorder.stop()
self._recorder = None
self._is_recording = False
# Show "Saving..." status immediately (file finalization takes time on SD card)
@@ -364,16 +377,30 @@ class SoundRecorder(Activity):
"""Handle play button click."""
if self._last_recording and not self._is_recording:
# Stop any current playback
AudioManager.stop()
if self._player:
self._player.stop()
time.sleep_ms(100)
output = AudioManager.get_default_output()
if output is None:
self._status_label.set_text("Playback failed")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), lv.PART.MAIN)
return
# Play the recording
success = AudioManager.play_wav(
self._last_recording,
stream_type=AudioManager.STREAM_MUSIC,
on_complete=self._on_playback_complete,
volume=100
)
try:
self._player = AudioManager.player(
file_path=self._last_recording,
stream_type=AudioManager.STREAM_MUSIC,
on_complete=self._on_playback_complete,
volume=100,
output=output,
)
self._player.start()
success = True
except Exception as exc:
print(f"SoundRecorder: playback failed: {exc}")
success = False
if success:
self._status_label.set_text("Playing...")
@@ -1,5 +1,4 @@
# AudioManager - Centralized Audio Management Service for MicroPythonOS
# Android-inspired audio routing with priority-based audio focus
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer, record_wav() -> I2S mic
# Registry-based audio routing with device descriptors and session control
from .audiomanager import AudioManager
from .audiomanager import AudioManager, Player, Recorder, StereoNotSupported
File diff suppressed because it is too large Load Diff
@@ -68,6 +68,7 @@ class RecordStream:
self._is_recording = False
self._i2s = None
self._bytes_recorded = 0
self._start_time_ms = 0
def is_recording(self):
"""Check if stream is currently recording."""
@@ -203,6 +204,7 @@ class RecordStream:
self._is_recording = True
self._bytes_recorded = 0
self._start_time_ms = time.ticks_ms()
try:
# Ensure directory exists
@@ -346,4 +348,9 @@ class RecordStream:
if self._i2s:
self._i2s.deinit()
self._i2s = None
print(f"RecordStream: Recording thread finished")
print(f"RecordStream: Recording thread finished")
def get_duration_ms(self):
if self._start_time_ms <= 0:
return 0
return time.ticks_diff(time.ticks_ms(), self._start_time_ms)
@@ -354,3 +354,8 @@ class ADCRecordStream:
finally:
self._is_recording = False
print(f"ADCRecordStream: Recording thread finished")
def get_duration_ms(self):
if self._start_time_ms <= 0:
return 0
return time.ticks_diff(time.ticks_ms(), self._start_time_ms)
@@ -147,7 +147,15 @@ class WAVStream:
Supports 8/16/24/32-bit PCM, mono and stereo, auto-upsampling to >=22050 Hz.
"""
def __init__(self, file_path, stream_type, volume, i2s_pins, on_complete):
def __init__(
self,
file_path,
stream_type,
volume,
i2s_pins,
on_complete,
requested_sample_rate=None,
):
"""
Initialize WAV stream.
@@ -157,15 +165,25 @@ class WAVStream:
volume: Volume level (0-100)
i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers
on_complete: Callback function(message) when playback finishes
requested_sample_rate: Optional negotiated sample rate for shared clocks
"""
self.file_path = file_path
self.stream_type = stream_type
self.volume = volume
self.i2s_pins = i2s_pins
self.on_complete = on_complete
self.requested_sample_rate = requested_sample_rate
self._keep_running = True
self._is_playing = False
self._i2s = None
self._progress_samples = 0
self._total_samples = 0
self._duration_ms = None
self._playback_rate = None
self._original_rate = None
self._channels = None
self._bits_per_sample = None
self._data_size = None
def is_playing(self):
"""Check if stream is currently playing."""
@@ -175,6 +193,19 @@ class WAVStream:
"""Stop playback."""
self._keep_running = False
def get_progress_percent(self):
if self._total_samples <= 0:
return None
return int((self._progress_samples / self._total_samples) * 100)
def get_progress_ms(self):
if self._playback_rate:
return int((self._progress_samples / self._playback_rate) * 1000)
return None
def get_duration_ms(self):
return self._duration_ms
# ----------------------------------------------------------------------
# WAV header parser - returns bit-depth and format info
# ----------------------------------------------------------------------
@@ -235,6 +266,37 @@ class WAVStream:
raise ValueError("No 'data' chunk found")
# ----------------------------------------------------------------------
# WAV info helpers
# ----------------------------------------------------------------------
@staticmethod
def get_wav_info(file_path):
with open(file_path, 'rb') as f:
data_start, data_size, sample_rate, channels, bits_per_sample = (
WAVStream._find_data_chunk(f)
)
return {
"data_start": data_start,
"data_size": data_size,
"sample_rate": sample_rate,
"channels": channels,
"bits_per_sample": bits_per_sample,
}
@staticmethod
def compute_playback_rate(original_rate, requested_rate=None):
if requested_rate:
if requested_rate <= original_rate:
return original_rate, 1
upsample_factor = (requested_rate + original_rate - 1) // original_rate
return original_rate * upsample_factor, upsample_factor
target_rate = 22050
if original_rate >= target_rate:
return original_rate, 1
upsample_factor = (target_rate + original_rate - 1) // original_rate
return original_rate * upsample_factor, upsample_factor
# ----------------------------------------------------------------------
# Bit depth conversion functions
# ----------------------------------------------------------------------
@@ -327,14 +389,17 @@ class WAVStream:
data_start, data_size, original_rate, channels, bits_per_sample = \
self._find_data_chunk(f)
# Decide playback rate (force >=22050 Hz) - but why?! the DAC should support down to 8kHz!
target_rate = 22050 # slower is faster (less data)
if original_rate >= target_rate:
playback_rate = original_rate
upsample_factor = 1
else:
upsample_factor = (target_rate + original_rate - 1) // original_rate
playback_rate = original_rate * upsample_factor
self._original_rate = original_rate
self._channels = channels
self._bits_per_sample = bits_per_sample
self._data_size = data_size
playback_rate, upsample_factor = self.compute_playback_rate(
original_rate,
self.requested_sample_rate,
)
self._playback_rate = playback_rate
print(f"WAVStream: {original_rate} Hz, {bits_per_sample}-bit, {channels}-ch")
print(f"WAVStream: Playback at {playback_rate} Hz (factor {upsample_factor})")
@@ -342,6 +407,11 @@ class WAVStream:
if data_size > file_size - data_start:
data_size = file_size - data_start
bytes_per_sample = (bits_per_sample // 8) * channels
if bytes_per_sample > 0:
self._total_samples = data_size // bytes_per_sample
self._duration_ms = int((self._total_samples / original_rate) * 1000)
# Initialize I2S (always 16-bit output)
try:
i2s_format = machine.I2S.MONO if channels == 1 else machine.I2S.STEREO
@@ -445,6 +515,7 @@ class WAVStream:
time.sleep(num_samples / playback_rate)
total_original += to_read
self._progress_samples = total_original // bytes_per_original_sample
print(f"WAVStream: Finished playing {self.file_path}")
if self.on_complete:
@@ -292,28 +292,47 @@ import mpos.sdcard
mpos.sdcard.init(spi_bus=spi_bus, cs_pin=14)
# === AUDIO HARDWARE ===
from machine import PWM, Pin
from mpos import AudioManager
# Initialize buzzer (GPIO 46)
buzzer = PWM(Pin(46), freq=550, duty=0)
# I2S pin configuration for audio output (DAC) and input (microphone)
# Note: I2S is created per-stream, not at boot (only one instance can exist)
# The DAC uses BCK (bit clock) on GPIO 2, while the microphone uses SCLK on GPIO 17
# See schematics: DAC has BCK=2, WS=47, SD=16; Microphone has SCLK=17, WS=47, DIN=15
i2s_pins = {
i2s_output_pins = {
'ws': 47, # Word Select / LRCLK shared between DAC and mic (mandatory)
# Output (DAC/speaker) config
'sck': 2, # SCLK or BCLK - Bit Clock for DAC output (mandatory)
'sd': 16, # Serial Data OUT (speaker/DAC)
# Input (microphone) config
}
i2s_input_pins = {
'ws': 47, # Word Select / LRCLK shared between DAC and mic (mandatory)
'sck_in': 17, # SCLK - Serial Clock for microphone input
'sd_in': 15, # DIN - Serial Data IN (microphone)
}
# Initialize AudioManager with I2S and buzzer
AudioManager(i2s_pins=i2s_pins, buzzer_instance=buzzer)
speaker_output = AudioManager.add(
AudioManager.Output(
name="speaker",
kind="i2s",
i2s_pins=i2s_output_pins,
)
)
buzzer_output = AudioManager.add(
AudioManager.Output(
name="buzzer",
kind="buzzer",
buzzer_pin=46,
)
)
mic_input = AudioManager.add(
AudioManager.Input(
name="mic",
kind="i2s",
i2s_pins=i2s_input_pins,
)
)
# === SENSOR HARDWARE ===
from mpos import SensorManager
@@ -344,7 +363,13 @@ def startup_wow_effect():
#startup_jingle = "ShortBeeps:d=32,o=5,b=320:c6,c7"
# Start the jingle
AudioManager.play_rtttl(startup_jingle,stream_type=AudioManager.STREAM_NOTIFICATION,volume=60)
player = AudioManager.player(
rtttl=startup_jingle,
stream_type=AudioManager.STREAM_NOTIFICATION,
volume=60,
output=buzzer_output,
)
player.start()
# Rainbow colors for the 5 LEDs
rainbow = [
@@ -191,7 +191,6 @@ import mpos.sdcard
mpos.sdcard.init(spi_bus=spi_bus, cs_pin=14)
# === AUDIO HARDWARE ===
from machine import PWM, Pin
# Initialize buzzer: now sits on PC14/CC1 of the CH32X035GxUx so needs custom code
#buzzer = PWM(Pin(46), freq=550, duty=0)
@@ -213,18 +212,31 @@ from machine import PWM, Pin
# - try similar combinations: hss + cs, cm + hsm
# - try cross combinations: hss + cm, cs + hsm
i2s_pins = {
i2s_output_pins = {
'ws': 47, # Word Select / LRCLK shared between DAC and mic (mandatory)
# Output (DAC/speaker) pins
'sd': 16, # Serial Data OUT (speaker/DAC)
'sck': 17, # SCLK aka BCLK (appears mandatory) BUT this pin is sck_in on the communicator
'mck': 2, # MCLK (mandatory) BUT this pin is sck on the communicator
}
# Initialize AudioManager with I2S (buzzer TODO)
# ADC microphone is on GPIO 1
from mpos import AudioManager
AudioManager(i2s_pins=i2s_pins, adc_mic_pin=1)
speaker_output = AudioManager.add(
AudioManager.Output(
name="speaker",
kind="i2s",
i2s_pins=i2s_output_pins,
)
)
# ADC microphone is on GPIO 1
mic_input = AudioManager.add(
AudioManager.Input(
name="mic",
kind="adc",
adc_mic_pin=1,
)
)
# === SENSOR HARDWARE ===
from mpos import SensorManager
+10 -3
View File
@@ -107,15 +107,22 @@ from mpos import AudioManager
# Desktop builds have no real audio hardware, but we simulate microphone
# recording with a 440Hz sine wave for testing WAV file generation
# The i2s_pins dict with 'sd_in' enables has_microphone() to return True
i2s_pins = {
# The i2s_pins dict with 'sd_in' enables microphone simulation
AudioManager()
output_i2s_pins = {
'sck': 0, # Simulated - not used on desktop
'ws': 0, # Simulated - not used on desktop
'sd': 0, # Simulated - not used on desktop
}
input_i2s_pins = {
'sck_in': 0, # Simulated - not used on desktop
'ws': 0, # Simulated - not used on desktop
'sd_in': 0, # Simulated - enables microphone simulation
}
AudioManager(i2s_pins=i2s_pins)
AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=output_i2s_pins))
AudioManager.add(AudioManager.Input("mic", "i2s", i2s_pins=input_i2s_pins))
# === LED HARDWARE ===
# Note: Desktop builds have no LED hardware
@@ -48,10 +48,16 @@ MPU6886_I2C_FREQ = const(400000)
print("m5stack_fire.py init buzzer")
buzzer = PWM(Pin(BUZZER_PIN, Pin.OUT, value=1), duty=5)
AudioManager(i2s_pins=None, buzzer_instance=buzzer)
AudioManager()
AudioManager.add(AudioManager.Output("buzzer", "buzzer", buzzer_pin=BUZZER_PIN))
AudioManager.set_volume(40)
AudioManager.play_rtttl("Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6")
while AudioManager.is_playing():
player = AudioManager.player(
rtttl="Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6",
stream_type=AudioManager.STREAM_NOTIFICATION,
)
player.start()
while player.is_playing():
time.sleep(0.1)
+22 -13
View File
@@ -61,7 +61,6 @@ blue_led = machine.Pin(LED_BLUE, machine.Pin.OUT)
blue_led.on()
print("odroid_go.py init buzzer")
buzzer = PWM(Pin(BUZZER_PIN, Pin.OUT, value=1), duty=5)
class BuzzerCallbacks:
@@ -80,17 +79,23 @@ class BuzzerCallbacks:
buzzer_callbacks = BuzzerCallbacks()
AudioManager(
i2s_pins=None,
buzzer_instance=buzzer,
# The buzzer makes noise when it's unmuted, to avoid this we
# mute it after playback and vice versa unmute it before playback:
pre_playback=buzzer_callbacks.unmute,
post_playback=buzzer_callbacks.mute,
buzzer_output = AudioManager.add(
AudioManager.Output(
name="buzzer",
kind="buzzer",
buzzer_pin=BUZZER_PIN,
)
)
AudioManager.set_volume(40)
AudioManager.play_rtttl("Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6")
while AudioManager.is_playing():
player = AudioManager.player(
rtttl="Star Trek:o=4,d=20,b=200:8f.,a#,4d#6.,8d6,a#.,g.,c6.,4f6",
output=buzzer_output,
on_complete=buzzer_callbacks.mute,
)
buzzer_callbacks.unmute()
player.start()
while player.is_playing():
time.sleep(0.1)
print("odroid_go.py machine.SPI.Bus() initialization")
@@ -232,12 +237,16 @@ def input_callback(indev, data):
elif button_volume.value() == 0:
print("Volume button pressed -> reset")
blue_led.on()
AudioManager.play_rtttl(
"Outro:o=5,d=32,b=160,b=160:c6,b,a,g,f,e,d,c",
player = AudioManager.player(
rtttl="Outro:o=5,d=32,b=160,b=160:c6,b,a,g,f,e,d,c",
stream_type=AudioManager.STREAM_ALARM,
volume=40,
output=buzzer_output,
on_complete=buzzer_callbacks.mute,
)
while AudioManager.is_playing():
buzzer_callbacks.unmute()
player.start()
while player.is_playing():
time.sleep(0.1)
machine.reset()
elif button_select.value() == 0:
+3
View File
@@ -68,6 +68,9 @@ $mpremote fs mkdir :/data/com.micropythonos.system.wifiservice
$mpremote fs cp ../internal_filesystem_excluded/data/com.micropythonos.system.wifiservice/config.json :/data/com.micropythonos.system.wifiservice/
$mpremote fs mkdir :/apps
$mpremote fs cp -r apps/com.micropythonos.musicplayer :/apps/
$mpremote fs cp -r apps/com.micropythonos.soundrecorder :/apps/
exit 1
$mpremote fs cp -r apps/com.micropythonos.* :/apps/
find apps/ -maxdepth 1 -type l | while read symlink; do
if echo $symlink | grep quasiboats; then
+63 -93
View File
@@ -5,8 +5,6 @@ import sys
# Import centralized mocks
from mpos.testing import (
MockMachine,
MockPWM,
MockPin,
MockThread,
inject_mocks,
)
@@ -26,17 +24,16 @@ class TestAudioManager(unittest.TestCase):
def setUp(self):
"""Initialize AudioManager before each test."""
self.buzzer = MockPWM(MockPin(46))
self.buzzer_pin = 46
self.i2s_pins = {'sck': 2, 'ws': 47, 'sd': 16}
# Reset singleton instance for each test
AudioManager._instance = None
AudioManager(
i2s_pins=self.i2s_pins,
buzzer_instance=self.buzzer
)
AudioManager()
AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=self.i2s_pins))
AudioManager.add(AudioManager.Output("buzzer", "buzzer", buzzer_pin=self.buzzer_pin))
# Reset volume to default after creating instance
AudioManager.set_volume(70)
@@ -47,32 +44,22 @@ class TestAudioManager(unittest.TestCase):
def test_initialization(self):
"""Test that AudioManager initializes correctly."""
am = AudioManager.get()
self.assertEqual(am._i2s_pins, self.i2s_pins)
self.assertEqual(am._buzzer_instance, self.buzzer)
self.assertEqual(len(am._outputs), 2)
self.assertEqual(am._outputs[0].i2s_pins, self.i2s_pins)
self.assertEqual(am._outputs[1].buzzer_pin, self.buzzer_pin)
def test_has_i2s(self):
"""Test has_i2s() returns correct value."""
# With I2S configured
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=None)
self.assertTrue(AudioManager.has_i2s())
# Without I2S configured
AudioManager._instance = None
AudioManager(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertFalse(AudioManager.has_i2s())
def test_get_outputs(self):
"""Test that get_outputs() returns configured outputs."""
outputs = AudioManager.get_outputs()
self.assertEqual(len(outputs), 2)
self.assertEqual(outputs[0].kind, "i2s")
self.assertEqual(outputs[1].kind, "buzzer")
def test_has_buzzer(self):
"""Test has_buzzer() returns correct value."""
# With buzzer configured
AudioManager._instance = None
AudioManager(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertTrue(AudioManager.has_buzzer())
# Without buzzer configured
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=None)
self.assertFalse(AudioManager.has_buzzer())
def test_default_output(self):
"""Test default output selection."""
default_output = AudioManager.get_default_output()
self.assertIsNotNone(default_output)
self.assertEqual(default_output.kind, "i2s")
def test_stream_types(self):
"""Test stream type constants and priority order."""
@@ -101,60 +88,53 @@ class TestAudioManager(unittest.TestCase):
"""Test that no hardware rejects all playback requests."""
# Re-initialize with no hardware
AudioManager._instance = None
AudioManager(i2s_pins=None, buzzer_instance=None)
AudioManager()
# WAV should be rejected (no I2S)
result = AudioManager.play_wav("test.wav")
self.assertFalse(result)
with self.assertRaises(ValueError):
AudioManager.player(file_path="test.wav").start()
# RTTTL should be rejected (no buzzer)
result = AudioManager.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
with self.assertRaises(ValueError):
AudioManager.player(rtttl="Test:d=4,o=5,b=120:c").start()
def test_i2s_only_rejects_rtttl(self):
"""Test that I2S-only config rejects buzzer playback."""
# Re-initialize with I2S only
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=None)
AudioManager()
AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=self.i2s_pins))
# RTTTL should be rejected (no buzzer)
result = AudioManager.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
with self.assertRaises(ValueError):
AudioManager.player(rtttl="Test:d=4,o=5,b=120:c").start()
def test_buzzer_only_rejects_wav(self):
"""Test that buzzer-only config rejects I2S playback."""
# Re-initialize with buzzer only
AudioManager._instance = None
AudioManager(i2s_pins=None, buzzer_instance=self.buzzer)
AudioManager()
AudioManager.add(AudioManager.Output("buzzer", "buzzer", buzzer_pin=self.buzzer_pin))
# WAV should be rejected (no I2S)
result = AudioManager.play_wav("test.wav")
self.assertFalse(result)
with self.assertRaises(ValueError):
AudioManager.player(file_path="test.wav").start()
def test_is_playing_initially_false(self):
"""Test that is_playing() returns False initially."""
# Reset to ensure clean state
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins, buzzer_instance=self.buzzer)
self.assertFalse(AudioManager.is_playing())
AudioManager()
AudioManager.add(AudioManager.Output("speaker", "i2s", i2s_pins=self.i2s_pins))
self.assertFalse(AudioManager.player(file_path="test.wav").is_playing())
def test_stop_with_no_playback(self):
"""Test that stop() can be called when nothing is playing."""
# Should not raise exception
AudioManager.stop()
self.assertFalse(AudioManager.is_playing())
def test_audio_focus_check_no_current_stream(self):
"""Test audio focus allows playback when no stream is active."""
am = AudioManager.get()
result = am._check_audio_focus(AudioManager.STREAM_MUSIC)
self.assertTrue(result)
def test_volume_default_value(self):
"""Test that default volume is reasonable."""
# After init, volume should be at default (70)
AudioManager(i2s_pins=None, buzzer_instance=None)
self.assertEqual(AudioManager.get_volume(), 70)
# After init, volume should be at default (50)
AudioManager._instance = None
AudioManager()
self.assertEqual(AudioManager.get_volume(), 50)
class TestAudioManagerRecording(unittest.TestCase):
@@ -162,20 +142,15 @@ class TestAudioManagerRecording(unittest.TestCase):
def setUp(self):
"""Initialize AudioManager with microphone before each test."""
self.buzzer = MockPWM(MockPin(46))
# I2S pins with microphone input
self.i2s_pins_with_mic = {'sck': 2, 'ws': 47, 'sd': 16, 'sd_in': 15}
# I2S pins without microphone input
self.i2s_pins_no_mic = {'sck': 2, 'ws': 47, 'sd': 16}
self.i2s_pins_with_mic = {'sck': 2, 'ws': 47, 'sd_in': 15}
# Reset singleton instance for each test
AudioManager._instance = None
AudioManager(
i2s_pins=self.i2s_pins_with_mic,
buzzer_instance=self.buzzer
)
AudioManager()
AudioManager.add(AudioManager.Input("mic", "i2s", i2s_pins=self.i2s_pins_with_mic))
# Reset volume to default after creating instance
AudioManager.set_volume(70)
@@ -183,43 +158,38 @@ class TestAudioManagerRecording(unittest.TestCase):
"""Clean up after each test."""
AudioManager.stop()
def test_has_microphone_with_sd_in(self):
"""Test has_microphone() returns True when sd_in pin is configured."""
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins_with_mic, buzzer_instance=None)
self.assertTrue(AudioManager.has_microphone())
def test_get_inputs(self):
"""Test get_inputs() returns configured inputs."""
inputs = AudioManager.get_inputs()
self.assertEqual(len(inputs), 1)
self.assertEqual(inputs[0].kind, "i2s")
def test_has_microphone_without_sd_in(self):
"""Test has_microphone() returns False when sd_in pin is not configured."""
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
self.assertFalse(AudioManager.has_microphone())
def test_has_microphone_no_i2s(self):
"""Test has_microphone() returns False when no I2S is configured."""
AudioManager._instance = None
AudioManager(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertFalse(AudioManager.has_microphone())
def test_default_input(self):
"""Test default input selection."""
default_input = AudioManager.get_default_input()
self.assertIsNotNone(default_input)
self.assertEqual(default_input.kind, "i2s")
def test_is_recording_initially_false(self):
"""Test that is_recording() returns False initially."""
self.assertFalse(AudioManager.is_recording())
recorder = AudioManager.recorder(file_path="test.wav")
self.assertFalse(recorder.is_recording())
def test_record_wav_no_microphone(self):
"""Test that record_wav() fails when no microphone is configured."""
"""Test that recorder() fails when no microphone is configured."""
AudioManager._instance = None
AudioManager(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
result = AudioManager.record_wav("test.wav")
self.assertFalse(result, "record_wav() fails when no microphone is configured")
AudioManager()
with self.assertRaises(ValueError):
AudioManager.recorder(file_path="test.wav").start()
def test_record_wav_no_i2s(self):
AudioManager._instance = None
AudioManager(i2s_pins=None, buzzer_instance=self.buzzer)
result = AudioManager.record_wav("test.wav")
self.assertFalse(result, "record_wav() should fail when no I2S is configured")
AudioManager()
AudioManager.add(AudioManager.Input("mic", "adc", adc_mic_pin=4))
recorder = AudioManager.recorder(file_path="test.wav")
self.assertFalse(recorder.is_recording())
def test_stop_with_no_recording(self):
"""Test that stop() can be called when nothing is recording."""
# Should not raise exception
AudioManager.stop()
self.assertFalse(AudioManager.is_recording())