Files
2026-02-24 21:44:36 +01:00

695 lines
21 KiB
Python

# AudioManager - Core Audio Management Service
# Registry-based audio routing with device descriptors and session control
import _thread
from ..task_manager import TaskManager
class StereoNotSupported(Exception):
pass
class AudioManager:
"""
Centralized audio management service with device registry and session control.
Usage:
from mpos import AudioManager
AudioManager.add(AudioManager.Output(...))
AudioManager.add(AudioManager.Input(...))
player = AudioManager.player(file_path="music.wav")
player.start()
"""
STREAM_MUSIC = 0
STREAM_NOTIFICATION = 1
STREAM_ALARM = 2
_instance = None
class Output:
def __init__(
self,
name,
kind,
channels=1,
i2s_pins=None,
buzzer_pin=None,
preferred_sample_rate=None,
):
if kind not in ("i2s", "buzzer"):
raise ValueError("Output.kind must be 'i2s' or 'buzzer'")
if channels not in (1, 2):
raise ValueError("Output.channels must be 1 or 2")
self.name = name
self.kind = kind
self.channels = channels
self.preferred_sample_rate = preferred_sample_rate
if kind == "i2s":
if not i2s_pins:
raise ValueError("Output.i2s_pins required for i2s output")
self._validate_i2s_pins(i2s_pins)
self.i2s_pins = dict(i2s_pins)
self.buzzer_pin = None
else:
if buzzer_pin is None:
raise ValueError("Output.buzzer_pin required for buzzer output")
self.buzzer_pin = buzzer_pin
self.i2s_pins = None
@staticmethod
def _validate_i2s_pins(i2s_pins):
allowed = {"sck", "ws", "sd", "mck"}
for key in i2s_pins:
if key not in allowed:
raise ValueError("Invalid i2s_pins key for output: %s" % key)
for key in ("ws", "sd"):
if key not in i2s_pins:
raise ValueError("i2s_pins must include '%s'" % key)
def __repr__(self):
return "<AudioOutput %s kind=%s>" % (self.name, self.kind)
class Input:
def __init__(
self,
name,
kind,
channels=1,
i2s_pins=None,
adc_mic_pin=None,
preferred_sample_rate=None,
):
if kind not in ("i2s", "adc"):
raise ValueError("Input.kind must be 'i2s' or 'adc'")
if channels != 1:
raise StereoNotSupported("Input channels=2 not supported yet")
self.name = name
self.kind = kind
self.channels = channels
self.preferred_sample_rate = preferred_sample_rate
if kind == "i2s":
if not i2s_pins:
raise ValueError("Input.i2s_pins required for i2s input")
self._validate_i2s_pins(i2s_pins)
self.i2s_pins = dict(i2s_pins)
self.adc_mic_pin = None
else:
if adc_mic_pin is None:
raise ValueError("Input.adc_mic_pin required for adc input")
self.adc_mic_pin = adc_mic_pin
self.i2s_pins = None
@staticmethod
def _validate_i2s_pins(i2s_pins):
allowed = {"sck_in", "sck", "ws", "sd_in"}
for key in i2s_pins:
if key not in allowed:
raise ValueError("Invalid i2s_pins key for input: %s" % key)
for key in ("ws", "sd_in"):
if key not in i2s_pins:
raise ValueError("i2s_pins must include '%s'" % key)
def __repr__(self):
return "<AudioInput %s kind=%s>" % (self.name, self.kind)
def __init__(self):
if getattr(self, "_initialized", False):
return
AudioManager._instance = self
self._outputs = []
self._inputs = []
self._default_output = None
self._default_input = None
self._active_sessions = []
self._volume = 50
self._initialized = True
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def get(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def add(cls, device):
return cls.get()._add_device(device)
def _add_device(self, device):
if isinstance(device, AudioManager.Output):
self._outputs.append(device)
if self._default_output is None:
self._default_output = device
return device
if isinstance(device, AudioManager.Input):
self._inputs.append(device)
if self._default_input is None:
self._default_input = device
return device
raise ValueError("Unsupported device type")
@classmethod
def get_outputs(cls):
return list(cls.get()._outputs)
@classmethod
def get_inputs(cls):
return list(cls.get()._inputs)
@classmethod
def get_default_output(cls):
return cls.get()._default_output
@classmethod
def get_default_input(cls):
return cls.get()._default_input
@classmethod
def set_default_output(cls, output):
cls.get()._default_output = output
@classmethod
def set_default_input(cls, input_device):
cls.get()._default_input = input_device
@classmethod
def set_volume(cls, volume):
manager = cls.get()
try:
volume_int = int(round(volume))
except (TypeError, ValueError):
return manager._volume
volume_int = max(0, min(100, volume_int))
manager._volume = volume_int
for session in list(manager._active_sessions):
stream = getattr(session, "_stream", None)
if stream and hasattr(stream, "set_volume"):
try:
stream.set_volume(volume_int)
except Exception:
pass
return volume_int
@classmethod
def get_volume(cls):
return cls.get()._volume
@classmethod
def get_active_player(cls, stream_type=None, file_path=None):
manager = cls.get()
manager._cleanup_inactive()
for session in list(manager._active_sessions):
if isinstance(session, Player):
if stream_type is not None and session.stream_type != stream_type:
continue
if file_path is not None and session.file_path != file_path:
continue
if session.is_playing():
return session
return None
@classmethod
def get_active_track(cls, stream_type=None):
player = cls.get_active_player(stream_type=stream_type)
if player and player.file_path:
return player.file_path
return None
@classmethod
def player(
cls,
file_path=None,
rtttl=None,
stream_type=None,
on_complete=None,
output=None,
sample_rate=None,
volume=None,
):
return Player(
manager=cls.get(),
file_path=file_path,
rtttl=rtttl,
stream_type=stream_type,
on_complete=on_complete,
output=output,
sample_rate=sample_rate,
volume=volume,
)
@classmethod
def rtttl_player(cls, rtttl, **kwargs):
return cls.player(rtttl=rtttl, **kwargs)
@classmethod
def recorder(
cls,
file_path,
input=None,
sample_rate=None,
on_complete=None,
duration_ms=None,
**adc_config
):
return Recorder(
manager=cls.get(),
file_path=file_path,
input_device=input,
sample_rate=sample_rate,
on_complete=on_complete,
duration_ms=duration_ms,
adc_config=adc_config,
)
@classmethod
def record_wav_adc(
cls,
file_path,
duration_ms=None,
sample_rate=None,
adc_pin=None,
on_complete=None,
**adc_config
):
manager = cls.get()
from mpos.audio.stream_record_adc import ADCRecordStream
stream = ADCRecordStream(
file_path=file_path,
duration_ms=duration_ms,
sample_rate=sample_rate,
adc_pin=adc_pin,
on_complete=on_complete,
**adc_config,
)
session = _ADCRecorderSession(manager, stream)
manager._resolve_conflicts(session)
manager._register_session(session)
_thread.stack_size(TaskManager.good_stack_size())
_thread.start_new_thread(session._record_thread, ())
return True
@classmethod
def stop(cls):
return cls.get()._stop_all()
def _stop_all(self):
for session in list(self._active_sessions):
session.stop()
self._active_sessions = []
def _register_session(self, session):
self._active_sessions.append(session)
def _session_finished(self, session):
if session in self._active_sessions:
self._active_sessions.remove(session)
def _cleanup_inactive(self):
active = []
for session in self._active_sessions:
if session.is_active():
active.append(session)
self._active_sessions = active
def _resolve_conflicts(self, new_session):
self._cleanup_inactive()
to_stop = []
for session in self._active_sessions:
if self._sessions_conflict(session, new_session):
to_stop.append(session)
for session in to_stop:
session.stop()
if session in self._active_sessions:
self._active_sessions.remove(session)
@staticmethod
def _pins_compatible(existing_signal, new_signal):
if existing_signal == new_signal and existing_signal in ("ws", "sck"):
return True
return False
def _sessions_conflict(self, existing, new_session):
existing_pins = existing.pin_usage()
new_pins = new_session.pin_usage()
shared_clock = False
for pin, new_signal in new_pins.items():
if pin in existing_pins:
existing_signal = existing_pins[pin]
if self._pins_compatible(existing_signal, new_signal):
shared_clock = True
continue
return True
if shared_clock:
if existing.sample_rate is None or new_session.sample_rate is None:
return True
if existing.sample_rate != new_session.sample_rate:
return True
return False
def _start_player(self, player):
if player.output is None:
player.output = self._default_output
if player.output is None:
raise ValueError("No output device registered")
if player.stream_type is None:
player.stream_type = (
self.STREAM_NOTIFICATION if player.rtttl else self.STREAM_MUSIC
)
if player.output.kind == "buzzer" and not player.rtttl:
raise ValueError("RTTTL string required for buzzer output")
if player.output.kind == "i2s" and not player.file_path:
raise ValueError("file_path required for i2s output")
player.sample_rate = self._determine_player_rate(player)
self._resolve_conflicts(player)
self._register_session(player)
_thread.stack_size(TaskManager.good_stack_size())
_thread.start_new_thread(player._play_thread, ())
def _start_recorder(self, recorder):
if recorder.input_device is None:
recorder.input_device = self._default_input
if recorder.input_device is None:
raise ValueError("No input device registered")
recorder.sample_rate = self._determine_recorder_rate(recorder)
self._resolve_conflicts(recorder)
self._register_session(recorder)
_thread.stack_size(TaskManager.good_stack_size())
_thread.start_new_thread(recorder._record_thread, ())
def _determine_player_rate(self, player):
if player.output.kind != "i2s":
return None
preferred = player.sample_rate or player.output.preferred_sample_rate
from mpos.audio.stream_wav import WAVStream
info = WAVStream.get_wav_info(player.file_path)
original_rate = info["sample_rate"]
playback_rate, _ = WAVStream.compute_playback_rate(original_rate, preferred)
return playback_rate
def _determine_recorder_rate(self, recorder):
if recorder.sample_rate:
return recorder.sample_rate
if recorder.input_device and recorder.input_device.preferred_sample_rate:
return recorder.input_device.preferred_sample_rate
return 16000
class _ADCRecorderSession:
def __init__(self, manager, stream):
self._manager = manager
self._stream = stream
self.sample_rate = stream.sample_rate
def start(self):
self._manager._resolve_conflicts(self)
self._manager._register_session(self)
_thread.stack_size(TaskManager.good_stack_size())
_thread.start_new_thread(self._record_thread, ())
def stop(self):
if self._stream:
self._stream.stop()
self._manager._session_finished(self)
def is_active(self):
return self.is_recording()
def is_recording(self):
return self._stream is not None and self._stream.is_recording()
def pin_usage(self):
adc_pin = getattr(self._stream, "adc_pin", None)
if adc_pin is None:
return {}
return {adc_pin: "adc"}
def _record_thread(self):
try:
self._stream.record()
finally:
self._manager._session_finished(self)
class Player:
def __init__(
self,
manager,
file_path=None,
rtttl=None,
stream_type=None,
on_complete=None,
output=None,
sample_rate=None,
volume=None,
):
self._manager = manager
self.file_path = file_path
self.rtttl = rtttl
self.stream_type = stream_type
self.on_complete = on_complete
self.output = output
self.sample_rate = sample_rate
self.volume = volume
self._stream = None
self._buzzer = None
def start(self):
self._manager._start_player(self)
def stop(self):
if self._stream:
self._stream.stop()
if self._buzzer:
try:
self._buzzer.deinit()
except Exception:
pass
self._manager._session_finished(self)
def pause(self):
if self._stream and hasattr(self._stream, "pause"):
self._stream.pause()
def resume(self):
if self._stream and hasattr(self._stream, "resume"):
self._stream.resume()
def is_active(self):
return self.is_playing()
def is_playing(self):
return self._stream is not None and self._stream.is_playing()
def get_progress_percent(self):
if self._stream and hasattr(self._stream, "get_progress_percent"):
return self._stream.get_progress_percent()
return None
def get_progress_ms(self):
if self._stream and hasattr(self._stream, "get_progress_ms"):
return self._stream.get_progress_ms()
return None
def get_duration_ms(self):
if self._stream and hasattr(self._stream, "get_duration_ms"):
return self._stream.get_duration_ms()
return None
def pin_usage(self):
if not self.output:
return {}
if self.output.kind == "buzzer":
return {self.output.buzzer_pin: "buzzer"}
if self.output.kind == "i2s":
return _pin_map_i2s_output(self.output.i2s_pins)
return {}
def _play_thread(self):
try:
if self.output.kind == "buzzer":
self._play_rtttl()
else:
self._play_wav()
finally:
if self._buzzer:
try:
self._buzzer.deinit()
except Exception:
pass
self._manager._session_finished(self)
def _play_rtttl(self):
from mpos.audio.stream_rtttl import RTTTLStream
from machine import Pin, PWM
self._buzzer = PWM(Pin(self.output.buzzer_pin, Pin.OUT))
self._buzzer.duty_u16(0)
self._stream = RTTTLStream(
rtttl_string=self.rtttl,
stream_type=self.stream_type,
volume=self.volume if self.volume is not None else self._manager._volume,
buzzer_instance=self._buzzer,
on_complete=self.on_complete,
)
self._stream.play()
def _play_wav(self):
from mpos.audio.stream_wav import WAVStream
self._stream = WAVStream(
file_path=self.file_path,
stream_type=self.stream_type,
volume=self.volume if self.volume is not None else self._manager._volume,
i2s_pins=self.output.i2s_pins,
on_complete=self.on_complete,
requested_sample_rate=self.sample_rate,
)
self._stream.play()
class Recorder:
def __init__(
self,
manager,
file_path,
input_device=None,
sample_rate=None,
on_complete=None,
duration_ms=None,
adc_config=None,
):
self._manager = manager
self.file_path = file_path
self.input_device = input_device
self.sample_rate = sample_rate
self.on_complete = on_complete
self.duration_ms = duration_ms
self.adc_config = adc_config or {}
self._stream = None
def start(self):
self._manager._start_recorder(self)
def stop(self):
if self._stream:
self._stream.stop()
self._manager._session_finished(self)
def pause(self):
if self._stream and hasattr(self._stream, "pause"):
self._stream.pause()
def resume(self):
if self._stream and hasattr(self._stream, "resume"):
self._stream.resume()
def is_active(self):
return self.is_recording()
def is_recording(self):
return self._stream is not None and self._stream.is_recording()
def get_duration_ms(self):
if self._stream and hasattr(self._stream, "get_duration_ms"):
return self._stream.get_duration_ms()
if self._stream and hasattr(self._stream, "get_elapsed_ms"):
return self._stream.get_elapsed_ms()
return None
def pin_usage(self):
if not self.input_device:
return {}
if self.input_device.kind == "adc":
return {self.input_device.adc_mic_pin: "adc"}
if self.input_device.kind == "i2s":
return _pin_map_i2s_input(self.input_device.i2s_pins)
return {}
def _record_thread(self):
try:
if self.input_device.kind == "adc":
self._record_adc()
else:
self._record_i2s()
finally:
self._manager._session_finished(self)
def _record_i2s(self):
from mpos.audio.stream_record import RecordStream
self._stream = RecordStream(
file_path=self.file_path,
duration_ms=self.duration_ms,
sample_rate=self.sample_rate,
i2s_pins=self.input_device.i2s_pins,
on_complete=self.on_complete,
)
self._stream.record()
def _record_adc(self):
from mpos.audio.stream_record_adc import ADCRecordStream
self._stream = ADCRecordStream(
file_path=self.file_path,
duration_ms=self.duration_ms,
sample_rate=self.sample_rate,
adc_pin=self.input_device.adc_mic_pin,
on_complete=self.on_complete,
**self.adc_config,
)
self._stream.record()
def _pin_map_i2s_output(i2s_pins):
pins = {}
if i2s_pins.get("sck") is not None:
pins[i2s_pins["sck"]] = "sck"
pins[i2s_pins["ws"]] = "ws"
pins[i2s_pins["sd"]] = "sd"
if i2s_pins.get("mck") is not None:
pins[i2s_pins["mck"]] = "mck"
return pins
def _pin_map_i2s_input(i2s_pins):
pins = {}
sck_pin = i2s_pins.get("sck_in", i2s_pins.get("sck"))
if sck_pin is not None:
pins[sck_pin] = "sck"
pins[i2s_pins["ws"]] = "ws"
pins[i2s_pins["sd_in"]] = "sd_in"
return pins