AudioFlinger: eliminate thread by using TaskManager (asyncio)

Also simplify, and move all testing mocks to a dedicated file.
This commit is contained in:
Thomas Farstrike
2025-12-17 17:03:42 +01:00
parent 23a8f92ea9
commit afe8434bc7
11 changed files with 1013 additions and 1221 deletions
+8 -15
View File
@@ -1,17 +1,12 @@
# AudioFlinger - Centralized Audio Management Service for MicroPythonOS
# Android-inspired audio routing with priority-based audio focus
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer
from . import audioflinger
# Re-export main API
from .audioflinger import (
# Device types
DEVICE_NULL,
DEVICE_I2S,
DEVICE_BUZZER,
DEVICE_BOTH,
# Stream types
# Stream types (for priority-based audio focus)
STREAM_MUSIC,
STREAM_NOTIFICATION,
STREAM_ALARM,
@@ -25,17 +20,14 @@ from .audioflinger import (
resume,
set_volume,
get_volume,
get_device_type,
is_playing,
# Hardware availability checks
has_i2s,
has_buzzer,
)
__all__ = [
# Device types
'DEVICE_NULL',
'DEVICE_I2S',
'DEVICE_BUZZER',
'DEVICE_BOTH',
# Stream types
'STREAM_MUSIC',
'STREAM_NOTIFICATION',
@@ -50,6 +42,7 @@ __all__ = [
'resume',
'set_volume',
'get_volume',
'get_device_type',
'is_playing',
'has_i2s',
'has_buzzer',
]
@@ -1,12 +1,11 @@
# AudioFlinger - Core Audio Management Service
# Centralized audio routing with priority-based audio focus (Android-inspired)
# Supports I2S (digital audio) and PWM buzzer (tones/ringtones)
#
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer
# Uses TaskManager (asyncio) for non-blocking background playback
# Device type constants
DEVICE_NULL = 0 # No audio hardware (desktop fallback)
DEVICE_I2S = 1 # Digital audio output (WAV playback)
DEVICE_BUZZER = 2 # PWM buzzer (tones/RTTTL)
DEVICE_BOTH = 3 # Both I2S and buzzer available
from mpos.task_manager import TaskManager
# Stream type constants (priority order: higher number = higher priority)
STREAM_MUSIC = 0 # Background music (lowest priority)
@@ -14,45 +13,47 @@ STREAM_NOTIFICATION = 1 # Notification sounds (medium priority)
STREAM_ALARM = 2 # Alarms/alerts (highest priority)
# Module-level state (singleton pattern, follows battery_voltage.py)
_device_type = DEVICE_NULL
_i2s_pins = None # I2S pin configuration dict (created per-stream)
_buzzer_instance = None # PWM buzzer instance
_current_stream = None # Currently playing stream
_current_task = None # Currently running playback task
_volume = 50 # System volume (0-100)
_stream_lock = None # Thread lock for stream management
def init(device_type, i2s_pins=None, buzzer_instance=None):
def init(i2s_pins=None, buzzer_instance=None):
"""
Initialize AudioFlinger with hardware configuration.
Args:
device_type: One of DEVICE_NULL, DEVICE_I2S, DEVICE_BUZZER, DEVICE_BOTH
i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers (for I2S devices)
buzzer_instance: PWM instance for buzzer (for buzzer devices)
i2s_pins: Dict with 'sck', 'ws', 'sd' pin numbers (for I2S/WAV playback)
buzzer_instance: PWM instance for buzzer (for RTTTL playback)
"""
global _device_type, _i2s_pins, _buzzer_instance, _stream_lock
global _i2s_pins, _buzzer_instance
_device_type = device_type
_i2s_pins = i2s_pins
_buzzer_instance = buzzer_instance
# Initialize thread lock for stream management
try:
import _thread
_stream_lock = _thread.allocate_lock()
except ImportError:
# Desktop mode - no threading support
_stream_lock = None
# Build status message
capabilities = []
if i2s_pins:
capabilities.append("I2S (WAV)")
if buzzer_instance:
capabilities.append("Buzzer (RTTTL)")
if capabilities:
print(f"AudioFlinger initialized: {', '.join(capabilities)}")
else:
print("AudioFlinger initialized: No audio hardware")
device_names = {
DEVICE_NULL: "NULL (no audio)",
DEVICE_I2S: "I2S (digital audio)",
DEVICE_BUZZER: "Buzzer (PWM tones)",
DEVICE_BOTH: "Both (I2S + Buzzer)"
}
print(f"AudioFlinger initialized: {device_names.get(device_type, 'Unknown')}")
def has_i2s():
"""Check if I2S audio is available for WAV playback."""
return _i2s_pins is not None
def has_buzzer():
"""Check if buzzer is available for RTTTL playback."""
return _buzzer_instance is not None
def _check_audio_focus(stream_type):
@@ -85,35 +86,27 @@ def _check_audio_focus(stream_type):
return True
def _playback_thread(stream):
async def _playback_coroutine(stream):
"""
Background thread function for audio playback.
Async coroutine for audio playback.
Args:
stream: Stream instance (WAVStream or RTTTLStream)
"""
global _current_stream
global _current_stream, _current_task
# Acquire lock and set as current stream
if _stream_lock:
_stream_lock.acquire()
_current_stream = stream
if _stream_lock:
_stream_lock.release()
try:
# Run playback (blocks until complete or stopped)
stream.play()
# Run async playback
await stream.play_async()
except Exception as e:
print(f"AudioFlinger: Playback error: {e}")
finally:
# Clear current stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream == stream:
_current_stream = None
if _stream_lock:
_stream_lock.release()
_current_task = None
def play_wav(file_path, stream_type=STREAM_MUSIC, volume=None, on_complete=None):
@@ -129,29 +122,19 @@ def play_wav(file_path, stream_type=STREAM_MUSIC, volume=None, on_complete=None)
Returns:
bool: True if playback started, False if rejected or unavailable
"""
if _device_type not in (DEVICE_I2S, DEVICE_BOTH):
print("AudioFlinger: play_wav() failed - no I2S device available")
return False
global _current_task
if not _i2s_pins:
print("AudioFlinger: play_wav() failed - I2S pins not configured")
print("AudioFlinger: play_wav() failed - I2S not configured")
return False
# Check audio focus
if _stream_lock:
_stream_lock.acquire()
can_start = _check_audio_focus(stream_type)
if _stream_lock:
_stream_lock.release()
if not can_start:
if not _check_audio_focus(stream_type):
return False
# Create stream and start playback in background thread
# Create stream and start playback as async task
try:
from mpos.audio.stream_wav import WAVStream
import _thread
import mpos.apps
stream = WAVStream(
file_path=file_path,
@@ -161,8 +144,7 @@ def play_wav(file_path, stream_type=STREAM_MUSIC, volume=None, on_complete=None)
on_complete=on_complete
)
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(_playback_thread, (stream,))
_current_task = TaskManager.create_task(_playback_coroutine(stream))
return True
except Exception as e:
@@ -183,29 +165,19 @@ def play_rtttl(rtttl_string, stream_type=STREAM_NOTIFICATION, volume=None, on_co
Returns:
bool: True if playback started, False if rejected or unavailable
"""
if _device_type not in (DEVICE_BUZZER, DEVICE_BOTH):
print("AudioFlinger: play_rtttl() failed - no buzzer device available")
return False
global _current_task
if not _buzzer_instance:
print("AudioFlinger: play_rtttl() failed - buzzer not initialized")
print("AudioFlinger: play_rtttl() failed - buzzer not configured")
return False
# Check audio focus
if _stream_lock:
_stream_lock.acquire()
can_start = _check_audio_focus(stream_type)
if _stream_lock:
_stream_lock.release()
if not can_start:
if not _check_audio_focus(stream_type):
return False
# Create stream and start playback in background thread
# Create stream and start playback as async task
try:
from mpos.audio.stream_rtttl import RTTTLStream
import _thread
import mpos.apps
stream = RTTTLStream(
rtttl_string=rtttl_string,
@@ -215,8 +187,7 @@ def play_rtttl(rtttl_string, stream_type=STREAM_NOTIFICATION, volume=None, on_co
on_complete=on_complete
)
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(_playback_thread, (stream,))
_current_task = TaskManager.create_task(_playback_coroutine(stream))
return True
except Exception as e:
@@ -226,10 +197,7 @@ def play_rtttl(rtttl_string, stream_type=STREAM_NOTIFICATION, volume=None, on_co
def stop():
"""Stop current audio playback."""
global _current_stream
if _stream_lock:
_stream_lock.acquire()
global _current_stream, _current_task
if _current_stream:
_current_stream.stop()
@@ -237,49 +205,30 @@ def stop():
else:
print("AudioFlinger: No playback to stop")
if _stream_lock:
_stream_lock.release()
def pause():
"""
Pause current audio playback (if supported by stream).
Note: Most streams don't support pause, only stop.
"""
global _current_stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream and hasattr(_current_stream, 'pause'):
_current_stream.pause()
print("AudioFlinger: Playback paused")
else:
print("AudioFlinger: Pause not supported or no playback active")
if _stream_lock:
_stream_lock.release()
def resume():
"""
Resume paused audio playback (if supported by stream).
Note: Most streams don't support resume, only play.
"""
global _current_stream
if _stream_lock:
_stream_lock.acquire()
if _current_stream and hasattr(_current_stream, 'resume'):
_current_stream.resume()
print("AudioFlinger: Playback resumed")
else:
print("AudioFlinger: Resume not supported or no playback active")
if _stream_lock:
_stream_lock.release()
def set_volume(volume):
"""
@@ -304,16 +253,6 @@ def get_volume():
return _volume
def get_device_type():
"""
Get configured audio device type.
Returns:
int: Device type (DEVICE_NULL, DEVICE_I2S, DEVICE_BUZZER, DEVICE_BOTH)
"""
return _device_type
def is_playing():
"""
Check if audio is currently playing.
@@ -321,12 +260,4 @@ def is_playing():
Returns:
bool: True if playback active, False otherwise
"""
if _stream_lock:
_stream_lock.acquire()
result = _current_stream is not None and _current_stream.is_playing()
if _stream_lock:
_stream_lock.release()
return result
return _current_stream is not None and _current_stream.is_playing()
@@ -1,9 +1,10 @@
# RTTTLStream - RTTTL Ringtone Playback Stream for AudioFlinger
# Ring Tone Text Transfer Language parser and player
# Ported from Fri3d Camp 2024 Badge firmware
# Uses async playback with TaskManager for non-blocking operation
import math
import time
from mpos.task_manager import TaskManager
class RTTTLStream:
@@ -179,8 +180,8 @@ class RTTTLStream:
yield freq, msec
def play(self):
"""Play RTTTL tune via buzzer (runs in background thread)."""
async def play_async(self):
"""Play RTTTL tune via buzzer (runs as TaskManager task)."""
self._is_playing = True
# Calculate exponential duty cycle for perceptually linear volume
@@ -212,9 +213,10 @@ class RTTTLStream:
self.buzzer.duty_u16(duty)
# Play for 90% of duration, silent for 10% (note separation)
time.sleep_ms(int(msec * 0.9))
# Use async sleep to allow other tasks to run
await TaskManager.sleep_ms(int(msec * 0.9))
self.buzzer.duty_u16(0)
time.sleep_ms(int(msec * 0.1))
await TaskManager.sleep_ms(int(msec * 0.1))
print(f"RTTTLStream: Finished playing '{self.name}'")
if self.on_complete:
@@ -1,13 +1,14 @@
# WAVStream - WAV File Playback Stream for AudioFlinger
# Supports 8/16/24/32-bit PCM, mono+stereo, auto-upsampling, volume control
# Ported from MusicPlayer's AudioPlayer class
# Uses async playback with TaskManager for non-blocking operation
import machine
import micropython
import os
import time
import sys
from mpos.task_manager import TaskManager
# Volume scaling function - Viper-optimized for ESP32 performance
# NOTE: The line below is automatically commented out by build_mpos.sh during
# Unix/macOS builds (cross-compiler doesn't support Viper), then uncommented after build.
@@ -313,8 +314,8 @@ class WAVStream:
# ----------------------------------------------------------------------
# Main playback routine
# ----------------------------------------------------------------------
def play(self):
"""Main playback routine (runs in background thread)."""
async def play_async(self):
"""Main async playback routine (runs as TaskManager task)."""
self._is_playing = True
try:
@@ -363,23 +364,12 @@ class WAVStream:
print(f"WAVStream: Playing {data_size} bytes (volume {self.volume}%)")
f.seek(data_start)
# smaller chunk size means less jerks but buffer can run empty
# at 22050 Hz, 16-bit, 2-ch, 4096/4 = 1024 samples / 22050 = 46ms
# with rough volume scaling:
# 4096 => audio stutters during quasibird at ~20fps
# 8192 => no audio stutters and quasibird runs at ~16 fps => good compromise!
# 16384 => no audio stutters during quasibird but low framerate (~8fps)
# with optimized volume scaling:
# 6144 => audio stutters and quasibird at ~17fps
# 7168 => audio slightly stutters and quasibird at ~16fps
# 8192 => no audio stutters and quasibird runs at ~15-17fps => this is probably best
# with shift volume scaling:
# 6144 => audio slightly stutters and quasibird at ~16fps?!
# 8192 => no audio stutters, quasibird runs at ~13fps?!
# with power of 2 thing:
# 6144 => audio sutters and quasibird at ~18fps
# 8192 => no audio stutters, quasibird runs at ~14fps
chunk_size = 8192
# Chunk size tuning notes:
# - Smaller chunks = more responsive to stop(), better async yielding
# - Larger chunks = less overhead, smoother audio
# - 4096 bytes with async yield works well for responsiveness
# - The 32KB I2S buffer handles timing smoothness
chunk_size = 4096
bytes_per_original_sample = (bits_per_sample // 8) * channels
total_original = 0
@@ -412,8 +402,6 @@ class WAVStream:
raw = self._upsample_buffer(raw, upsample_factor)
# 3. Volume scaling
#shift = 16 - int(self.volume / 6.25)
#_scale_audio_powers_of_2(raw, len(raw), shift)
scale = self.volume / 100.0
if scale < 1.0:
scale_fixed = int(scale * 32768)
@@ -425,9 +413,12 @@ class WAVStream:
else:
# Simulate playback timing if no I2S
num_samples = len(raw) // (2 * channels)
time.sleep(num_samples / playback_rate)
await TaskManager.sleep(num_samples / playback_rate)
total_original += to_read
# Yield to other async tasks after each chunk
await TaskManager.sleep_ms(0)
print(f"WAVStream: Finished playing {self.file_path}")
if self.on_complete:
@@ -304,9 +304,8 @@ i2s_pins = {
'sd': 16,
}
# Initialize AudioFlinger (both I2S and buzzer available)
# Initialize AudioFlinger with I2S and buzzer
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BOTH,
i2s_pins=i2s_pins,
buzzer_instance=buzzer
)
+1 -5
View File
@@ -100,11 +100,7 @@ import mpos.audio.audioflinger as AudioFlinger
# Note: Desktop builds have no audio hardware
# AudioFlinger functions will return False (no-op)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
AudioFlinger.init()
# === LED HARDWARE ===
# Note: Desktop builds have no LED hardware
@@ -113,8 +113,8 @@ except Exception as e:
# === AUDIO HARDWARE ===
import mpos.audio.audioflinger as AudioFlinger
# Note: Waveshare board has no buzzer or I2S audio:
AudioFlinger.init(device_type=AudioFlinger.DEVICE_NULL)
# Note: Waveshare board has no buzzer or I2S audio
AudioFlinger.init()
# === LED HARDWARE ===
# Note: Waveshare board has no NeoPixel LEDs
@@ -0,0 +1,77 @@
"""
MicroPythonOS Testing Module
Provides mock implementations for testing without actual hardware.
These mocks work on both desktop (unit tests) and device (integration tests).
Usage:
from mpos.testing import MockMachine, MockTaskManager, MockNetwork
# Inject mocks before importing modules that use hardware
import sys
sys.modules['machine'] = MockMachine()
# Or use the helper function
from mpos.testing import inject_mocks
inject_mocks(['machine', 'mpos.task_manager'])
"""
from .mocks import (
# Hardware mocks
MockMachine,
MockPin,
MockPWM,
MockI2S,
MockTimer,
MockSocket,
# MPOS mocks
MockTaskManager,
MockTask,
MockDownloadManager,
# Network mocks
MockNetwork,
MockRequests,
MockResponse,
MockRaw,
# Utility mocks
MockTime,
MockJSON,
MockModule,
# Helper functions
inject_mocks,
create_mock_module,
)
__all__ = [
# Hardware mocks
'MockMachine',
'MockPin',
'MockPWM',
'MockI2S',
'MockTimer',
'MockSocket',
# MPOS mocks
'MockTaskManager',
'MockTask',
'MockDownloadManager',
# Network mocks
'MockNetwork',
'MockRequests',
'MockResponse',
'MockRaw',
# Utility mocks
'MockTime',
'MockJSON',
'MockModule',
# Helper functions
'inject_mocks',
'create_mock_module',
]
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+56 -156
View File
@@ -2,66 +2,21 @@
import unittest
import sys
# Import centralized mocks
from mpos.testing import (
MockMachine,
MockPWM,
MockPin,
MockTaskManager,
create_mock_module,
inject_mocks,
)
# Mock hardware before importing
class MockPWM:
def __init__(self, pin, freq=0, duty=0):
self.pin = pin
self.last_freq = freq
self.last_duty = duty
def freq(self, value=None):
if value is not None:
self.last_freq = value
return self.last_freq
def duty_u16(self, value=None):
if value is not None:
self.last_duty = value
return self.last_duty
class MockPin:
IN = 0
OUT = 1
def __init__(self, pin_number, mode=None):
self.pin_number = pin_number
self.mode = mode
# Inject mocks
class MockMachine:
PWM = MockPWM
Pin = MockPin
sys.modules['machine'] = MockMachine()
class MockLock:
def acquire(self):
pass
def release(self):
pass
class MockThread:
@staticmethod
def allocate_lock():
return MockLock()
@staticmethod
def start_new_thread(func, args, **kwargs):
pass # No-op for testing
@staticmethod
def stack_size(size=None):
return 16384 if size is None else None
sys.modules['_thread'] = MockThread()
class MockMposApps:
@staticmethod
def good_stack_size():
return 16384
sys.modules['mpos.apps'] = MockMposApps()
# Inject mocks before importing AudioFlinger
inject_mocks({
'machine': MockMachine(),
'mpos.task_manager': create_mock_module('mpos.task_manager', TaskManager=MockTaskManager),
})
# Now import the module to test
import mpos.audio.audioflinger as AudioFlinger
@@ -79,7 +34,6 @@ class TestAudioFlinger(unittest.TestCase):
AudioFlinger.set_volume(70)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BOTH,
i2s_pins=self.i2s_pins,
buzzer_instance=self.buzzer
)
@@ -90,16 +44,28 @@ class TestAudioFlinger(unittest.TestCase):
def test_initialization(self):
"""Test that AudioFlinger initializes correctly."""
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BOTH)
self.assertEqual(AudioFlinger._i2s_pins, self.i2s_pins)
self.assertEqual(AudioFlinger._buzzer_instance, self.buzzer)
def test_device_types(self):
"""Test device type constants."""
self.assertEqual(AudioFlinger.DEVICE_NULL, 0)
self.assertEqual(AudioFlinger.DEVICE_I2S, 1)
self.assertEqual(AudioFlinger.DEVICE_BUZZER, 2)
self.assertEqual(AudioFlinger.DEVICE_BOTH, 3)
def test_has_i2s(self):
"""Test has_i2s() returns correct value."""
# With I2S configured
AudioFlinger.init(i2s_pins=self.i2s_pins, buzzer_instance=None)
self.assertTrue(AudioFlinger.has_i2s())
# Without I2S configured
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertFalse(AudioFlinger.has_i2s())
def test_has_buzzer(self):
"""Test has_buzzer() returns correct value."""
# With buzzer configured
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertTrue(AudioFlinger.has_buzzer())
# Without buzzer configured
AudioFlinger.init(i2s_pins=self.i2s_pins, buzzer_instance=None)
self.assertFalse(AudioFlinger.has_buzzer())
def test_stream_types(self):
"""Test stream type constants and priority order."""
@@ -124,58 +90,34 @@ class TestAudioFlinger(unittest.TestCase):
AudioFlinger.set_volume(-10)
self.assertEqual(AudioFlinger.get_volume(), 0)
def test_device_null_rejects_playback(self):
"""Test that DEVICE_NULL rejects all playback requests."""
# Re-initialize with no device
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
# WAV should be rejected
result = AudioFlinger.play_wav("test.wav")
self.assertFalse(result)
# RTTTL should be rejected
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
def test_device_i2s_only_rejects_rtttl(self):
"""Test that DEVICE_I2S rejects buzzer playback."""
# Re-initialize with I2S only
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins=self.i2s_pins,
buzzer_instance=None
)
# RTTTL should be rejected (no buzzer)
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
def test_device_buzzer_only_rejects_wav(self):
"""Test that DEVICE_BUZZER rejects I2S playback."""
# Re-initialize with buzzer only
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BUZZER,
i2s_pins=None,
buzzer_instance=self.buzzer
)
def test_no_hardware_rejects_playback(self):
"""Test that no hardware rejects all playback requests."""
# Re-initialize with no hardware
AudioFlinger.init(i2s_pins=None, buzzer_instance=None)
# WAV should be rejected (no I2S)
result = AudioFlinger.play_wav("test.wav")
self.assertFalse(result)
def test_missing_i2s_pins_rejects_wav(self):
"""Test that missing I2S pins rejects WAV playback."""
# Re-initialize with I2S device but no pins
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins=None,
buzzer_instance=None
)
# RTTTL should be rejected (no buzzer)
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
def test_i2s_only_rejects_rtttl(self):
"""Test that I2S-only config rejects buzzer playback."""
# Re-initialize with I2S only
AudioFlinger.init(i2s_pins=self.i2s_pins, buzzer_instance=None)
# RTTTL should be rejected (no buzzer)
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
self.assertFalse(result)
def test_buzzer_only_rejects_wav(self):
"""Test that buzzer-only config rejects I2S playback."""
# Re-initialize with buzzer only
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
# WAV should be rejected (no I2S)
result = AudioFlinger.play_wav("test.wav")
self.assertFalse(result)
@@ -189,55 +131,13 @@ class TestAudioFlinger(unittest.TestCase):
AudioFlinger.stop()
self.assertFalse(AudioFlinger.is_playing())
def test_get_device_type(self):
"""Test that get_device_type() returns correct value."""
# Test DEVICE_BOTH
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BOTH,
i2s_pins=self.i2s_pins,
buzzer_instance=self.buzzer
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BOTH)
# Test DEVICE_I2S
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_I2S,
i2s_pins=self.i2s_pins,
buzzer_instance=None
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_I2S)
# Test DEVICE_BUZZER
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_BUZZER,
i2s_pins=None,
buzzer_instance=self.buzzer
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_BUZZER)
# Test DEVICE_NULL
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
self.assertEqual(AudioFlinger.get_device_type(), AudioFlinger.DEVICE_NULL)
def test_audio_focus_check_no_current_stream(self):
"""Test audio focus allows playback when no stream is active."""
result = AudioFlinger._check_audio_focus(AudioFlinger.STREAM_MUSIC)
self.assertTrue(result)
def test_init_creates_lock(self):
"""Test that initialization creates a stream lock."""
self.assertIsNotNone(AudioFlinger._stream_lock)
def test_volume_default_value(self):
"""Test that default volume is reasonable."""
# After init, volume should be at default (70)
AudioFlinger.init(
device_type=AudioFlinger.DEVICE_NULL,
i2s_pins=None,
buzzer_instance=None
)
AudioFlinger.init(i2s_pins=None, buzzer_instance=None)
self.assertEqual(AudioFlinger.get_volume(), 70)