AudioFlinger: add support for I2S microphone recording to WAV

This commit is contained in:
Thomas Farstrike
2025-12-17 21:49:51 +01:00
parent e64b475b10
commit da9f912ab7
8 changed files with 896 additions and 17 deletions
@@ -0,0 +1,23 @@
{
"name": "Sound Recorder",
"publisher": "MicroPythonOS",
"short_description": "Record audio from microphone",
"long_description": "Record audio from the I2S microphone and save as WAV files. Recordings can be played back with the Music Player app.",
"icon_url": "https://apps.micropythonos.com/apps/com.micropythonos.soundrecorder/icons/com.micropythonos.soundrecorder_0.0.1_64x64.png",
"download_url": "https://apps.micropythonos.com/apps/com.micropythonos.soundrecorder/mpks/com.micropythonos.soundrecorder_0.0.1.mpk",
"fullname": "com.micropythonos.soundrecorder",
"version": "0.0.1",
"category": "utilities",
"activities": [
{
"entrypoint": "assets/sound_recorder.py",
"classname": "SoundRecorder",
"intent_filters": [
{
"action": "main",
"category": "launcher"
}
]
}
]
}
@@ -0,0 +1,340 @@
# Sound Recorder App - Record audio from I2S microphone to WAV files
import os
import time
from mpos.apps import Activity
import mpos.ui
import mpos.audio.audioflinger as AudioFlinger
def _makedirs(path):
"""
Create directory and all parent directories (like os.makedirs).
MicroPython doesn't have os.makedirs, so we implement it manually.
"""
if not path:
return
parts = path.split('/')
current = ''
for part in parts:
if not part:
continue
current = current + '/' + part if current else part
try:
os.mkdir(current)
except OSError:
pass # Directory may already exist
class SoundRecorder(Activity):
"""
Sound Recorder app for recording audio from I2S microphone.
Saves recordings as WAV files that can be played with Music Player.
"""
# Constants
MAX_DURATION_MS = 60000 # 60 seconds max recording
RECORDINGS_DIR = "data/com.micropythonos.soundrecorder/recordings"
# UI Widgets
_status_label = None
_timer_label = None
_record_button = None
_record_button_label = None
_play_button = None
_play_button_label = None
_delete_button = None
_last_file_label = None
# State
_is_recording = False
_last_recording = None
_timer_task = None
_record_start_time = 0
def onCreate(self):
screen = lv.obj()
# Title
title = lv.label(screen)
title.set_text("Sound Recorder")
title.align(lv.ALIGN.TOP_MID, 0, 10)
title.set_style_text_font(lv.font_montserrat_20, 0)
# Status label (shows microphone availability)
self._status_label = lv.label(screen)
self._status_label.align(lv.ALIGN.TOP_MID, 0, 40)
# Timer display
self._timer_label = lv.label(screen)
self._timer_label.set_text("00:00 / 01:00")
self._timer_label.align(lv.ALIGN.CENTER, 0, -30)
self._timer_label.set_style_text_font(lv.font_montserrat_24, 0)
# Record button
self._record_button = lv.button(screen)
self._record_button.set_size(120, 50)
self._record_button.align(lv.ALIGN.CENTER, 0, 30)
self._record_button.add_event_cb(self._on_record_clicked, lv.EVENT.CLICKED, None)
self._record_button_label = lv.label(self._record_button)
self._record_button_label.set_text(lv.SYMBOL.AUDIO + " Record")
self._record_button_label.center()
# Last recording info
self._last_file_label = lv.label(screen)
self._last_file_label.align(lv.ALIGN.BOTTOM_MID, 0, -70)
self._last_file_label.set_text("No recordings yet")
self._last_file_label.set_long_mode(lv.label.LONG_MODE.SCROLL_CIRCULAR)
self._last_file_label.set_width(lv.pct(90))
# Play button
self._play_button = lv.button(screen)
self._play_button.set_size(80, 40)
self._play_button.align(lv.ALIGN.BOTTOM_LEFT, 20, -20)
self._play_button.add_event_cb(self._on_play_clicked, lv.EVENT.CLICKED, None)
self._play_button.add_flag(lv.obj.FLAG.HIDDEN)
self._play_button_label = lv.label(self._play_button)
self._play_button_label.set_text(lv.SYMBOL.PLAY + " Play")
self._play_button_label.center()
# Delete button
self._delete_button = lv.button(screen)
self._delete_button.set_size(80, 40)
self._delete_button.align(lv.ALIGN.BOTTOM_RIGHT, -20, -20)
self._delete_button.add_event_cb(self._on_delete_clicked, lv.EVENT.CLICKED, None)
self._delete_button.add_flag(lv.obj.FLAG.HIDDEN)
delete_label = lv.label(self._delete_button)
delete_label.set_text(lv.SYMBOL.TRASH + " Delete")
delete_label.center()
# Add to focus group
focusgroup = lv.group_get_default()
if focusgroup:
focusgroup.add_obj(self._record_button)
focusgroup.add_obj(self._play_button)
focusgroup.add_obj(self._delete_button)
self.setContentView(screen)
def onResume(self, screen):
super().onResume(screen)
self._update_status()
self._find_last_recording()
def onPause(self, screen):
super().onPause(screen)
# Stop recording if app goes to background
if self._is_recording:
self._stop_recording()
def _update_status(self):
"""Update status label based on microphone availability."""
if AudioFlinger.has_microphone():
self._status_label.set_text("Microphone ready")
self._status_label.set_style_text_color(lv.color_hex(0x00AA00), 0)
self._record_button.remove_flag(lv.obj.FLAG.HIDDEN)
else:
self._status_label.set_text("No microphone available")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), 0)
self._record_button.add_flag(lv.obj.FLAG.HIDDEN)
def _find_last_recording(self):
"""Find the most recent recording file."""
try:
# Ensure recordings directory exists
_makedirs(self.RECORDINGS_DIR)
# List recordings
files = os.listdir(self.RECORDINGS_DIR)
wav_files = [f for f in files if f.endswith('.wav')]
if wav_files:
# Sort by name (which includes timestamp)
wav_files.sort(reverse=True)
self._last_recording = f"{self.RECORDINGS_DIR}/{wav_files[0]}"
self._last_file_label.set_text(f"Last: {wav_files[0]}")
self._play_button.remove_flag(lv.obj.FLAG.HIDDEN)
self._delete_button.remove_flag(lv.obj.FLAG.HIDDEN)
else:
self._last_recording = None
self._last_file_label.set_text("No recordings yet")
self._play_button.add_flag(lv.obj.FLAG.HIDDEN)
self._delete_button.add_flag(lv.obj.FLAG.HIDDEN)
except Exception as e:
print(f"SoundRecorder: Error finding recordings: {e}")
self._last_recording = None
def _generate_filename(self):
"""Generate a timestamped filename for the recording."""
# Get current time
t = time.localtime()
timestamp = f"{t[0]:04d}-{t[1]:02d}-{t[2]:02d}_{t[3]:02d}-{t[4]:02d}-{t[5]:02d}"
return f"{self.RECORDINGS_DIR}/{timestamp}.wav"
def _on_record_clicked(self, event):
"""Handle record button click."""
print(f"SoundRecorder: _on_record_clicked called, _is_recording={self._is_recording}")
if self._is_recording:
print("SoundRecorder: Stopping recording...")
self._stop_recording()
else:
print("SoundRecorder: Starting recording...")
self._start_recording()
def _start_recording(self):
"""Start recording audio."""
print("SoundRecorder: _start_recording called")
print(f"SoundRecorder: has_microphone() = {AudioFlinger.has_microphone()}")
if not AudioFlinger.has_microphone():
print("SoundRecorder: No microphone available - aborting")
return
# Generate filename
file_path = self._generate_filename()
print(f"SoundRecorder: Generated filename: {file_path}")
# Start recording
print(f"SoundRecorder: Calling AudioFlinger.record_wav()")
print(f" file_path: {file_path}")
print(f" duration_ms: {self.MAX_DURATION_MS}")
print(f" sample_rate: 16000")
success = AudioFlinger.record_wav(
file_path=file_path,
duration_ms=self.MAX_DURATION_MS,
on_complete=self._on_recording_complete,
sample_rate=16000
)
print(f"SoundRecorder: record_wav returned: {success}")
if success:
self._is_recording = True
self._record_start_time = time.ticks_ms()
self._last_recording = file_path
print(f"SoundRecorder: Recording started successfully")
# Update UI
self._record_button_label.set_text(lv.SYMBOL.STOP + " Stop")
self._record_button.set_style_bg_color(lv.color_hex(0xAA0000), 0)
self._status_label.set_text("Recording...")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), 0)
# Hide play/delete buttons during recording
self._play_button.add_flag(lv.obj.FLAG.HIDDEN)
self._delete_button.add_flag(lv.obj.FLAG.HIDDEN)
# Start timer update
self._start_timer_update()
else:
print("SoundRecorder: record_wav failed!")
self._status_label.set_text("Failed to start recording")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), 0)
def _stop_recording(self):
"""Stop recording audio."""
AudioFlinger.stop()
self._is_recording = False
# Update UI
self._record_button_label.set_text(lv.SYMBOL.AUDIO + " Record")
self._record_button.set_style_bg_color(lv.theme_get_color_primary(None), 0)
self._update_status()
# Stop timer update
self._stop_timer_update()
def _on_recording_complete(self, message):
"""Callback when recording finishes."""
print(f"SoundRecorder: {message}")
# Update UI on main thread
self.update_ui_threadsafe_if_foreground(self._recording_finished, message)
def _recording_finished(self, message):
"""Update UI after recording finishes (called on main thread)."""
self._is_recording = False
# Update UI
self._record_button_label.set_text(lv.SYMBOL.AUDIO + " Record")
self._record_button.set_style_bg_color(lv.theme_get_color_primary(None), 0)
self._update_status()
self._find_last_recording()
# Stop timer update
self._stop_timer_update()
def _start_timer_update(self):
"""Start updating the timer display."""
# Use LVGL timer for periodic updates
self._timer_task = lv.timer_create(self._update_timer, 100, None)
def _stop_timer_update(self):
"""Stop updating the timer display."""
if self._timer_task:
self._timer_task.delete()
self._timer_task = None
self._timer_label.set_text("00:00 / 01:00")
def _update_timer(self, timer):
"""Update timer display (called periodically)."""
if not self._is_recording:
return
elapsed_ms = time.ticks_diff(time.ticks_ms(), self._record_start_time)
elapsed_sec = elapsed_ms // 1000
max_sec = self.MAX_DURATION_MS // 1000
elapsed_min = elapsed_sec // 60
elapsed_sec = elapsed_sec % 60
max_min = max_sec // 60
max_sec_display = max_sec % 60
self._timer_label.set_text(
f"{elapsed_min:02d}:{elapsed_sec:02d} / {max_min:02d}:{max_sec_display:02d}"
)
def _on_play_clicked(self, event):
"""Handle play button click."""
if self._last_recording and not self._is_recording:
# Stop any current playback
AudioFlinger.stop()
time.sleep_ms(100)
# Play the recording
success = AudioFlinger.play_wav(
self._last_recording,
stream_type=AudioFlinger.STREAM_MUSIC,
on_complete=self._on_playback_complete
)
if success:
self._status_label.set_text("Playing...")
self._status_label.set_style_text_color(lv.color_hex(0x0000AA), 0)
else:
self._status_label.set_text("Playback failed")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), 0)
def _on_playback_complete(self, message):
"""Callback when playback finishes."""
self.update_ui_threadsafe_if_foreground(self._update_status)
def _on_delete_clicked(self, event):
"""Handle delete button click."""
if self._last_recording and not self._is_recording:
try:
os.remove(self._last_recording)
print(f"SoundRecorder: Deleted {self._last_recording}")
self._find_last_recording()
self._status_label.set_text("Recording deleted")
except Exception as e:
print(f"SoundRecorder: Delete failed: {e}")
self._status_label.set_text("Delete failed")
self._status_label.set_style_text_color(lv.color_hex(0xAA0000), 0)
+16 -4
View File
@@ -1,6 +1,6 @@
# AudioFlinger - Centralized Audio Management Service for MicroPythonOS
# Android-inspired audio routing with priority-based audio focus
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer, record_wav() -> I2S mic
from . import audioflinger
@@ -11,7 +11,7 @@ from .audioflinger import (
STREAM_NOTIFICATION,
STREAM_ALARM,
# Core functions
# Core playback functions
init,
play_wav,
play_rtttl,
@@ -21,10 +21,15 @@ from .audioflinger import (
set_volume,
get_volume,
is_playing,
# Recording functions
record_wav,
is_recording,
# Hardware availability checks
has_i2s,
has_buzzer,
has_microphone,
)
__all__ = [
@@ -33,7 +38,7 @@ __all__ = [
'STREAM_NOTIFICATION',
'STREAM_ALARM',
# Functions
# Playback functions
'init',
'play_wav',
'play_rtttl',
@@ -43,6 +48,13 @@ __all__ = [
'set_volume',
'get_volume',
'is_playing',
# Recording functions
'record_wav',
'is_recording',
# Hardware checks
'has_i2s',
'has_buzzer',
'has_microphone',
]
@@ -2,8 +2,8 @@
# Centralized audio routing with priority-based audio focus (Android-inspired)
# Supports I2S (digital audio) and PWM buzzer (tones/ringtones)
#
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer
# Uses _thread for non-blocking background playback (separate thread from UI)
# Simple routing: play_wav() -> I2S, play_rtttl() -> buzzer, record_wav() -> I2S mic
# Uses _thread for non-blocking background playback/recording (separate thread from UI)
import _thread
import mpos.apps
@@ -17,6 +17,7 @@ STREAM_ALARM = 2 # Alarms/alerts (highest priority)
_i2s_pins = None # I2S pin configuration dict (created per-stream)
_buzzer_instance = None # PWM buzzer instance
_current_stream = None # Currently playing stream
_current_recording = None # Currently recording stream
_volume = 50 # System volume (0-100)
@@ -56,6 +57,11 @@ def has_buzzer():
return _buzzer_instance is not None
def has_microphone():
"""Check if I2S microphone is available for recording."""
return _i2s_pins is not None and 'sd_in' in _i2s_pins
def _check_audio_focus(stream_type):
"""
Check if a stream with the given type can start playback.
@@ -193,15 +199,108 @@ def play_rtttl(rtttl_string, stream_type=STREAM_NOTIFICATION, volume=None, on_co
return False
def _recording_thread(stream):
"""
Thread function for audio recording.
Runs in a separate thread to avoid blocking the UI.
Args:
stream: RecordStream instance
"""
global _current_recording
_current_recording = stream
try:
# Run synchronous recording in this thread
stream.record()
except Exception as e:
print(f"AudioFlinger: Recording error: {e}")
finally:
# Clear current recording
if _current_recording == stream:
_current_recording = None
def record_wav(file_path, duration_ms=None, on_complete=None, sample_rate=16000):
"""
Record audio from I2S microphone to WAV file.
Args:
file_path: Path to save WAV file (e.g., "data/recording.wav")
duration_ms: Recording duration in milliseconds (None = 60 seconds default)
on_complete: Callback function(message) when recording finishes
sample_rate: Sample rate in Hz (default 16000 for voice)
Returns:
bool: True if recording started, False if rejected or unavailable
"""
print(f"AudioFlinger.record_wav() called")
print(f" file_path: {file_path}")
print(f" duration_ms: {duration_ms}")
print(f" sample_rate: {sample_rate}")
print(f" _i2s_pins: {_i2s_pins}")
print(f" has_microphone(): {has_microphone()}")
if not has_microphone():
print("AudioFlinger: record_wav() failed - microphone not configured")
return False
# Cannot record while playing (I2S can only be TX or RX, not both)
if is_playing():
print("AudioFlinger: Cannot record while playing")
return False
# Cannot start new recording while already recording
if is_recording():
print("AudioFlinger: Already recording")
return False
# Create stream and start recording in separate thread
try:
print("AudioFlinger: Importing RecordStream...")
from mpos.audio.stream_record import RecordStream
print("AudioFlinger: Creating RecordStream instance...")
stream = RecordStream(
file_path=file_path,
duration_ms=duration_ms,
sample_rate=sample_rate,
i2s_pins=_i2s_pins,
on_complete=on_complete
)
print("AudioFlinger: Starting recording thread...")
_thread.stack_size(mpos.apps.good_stack_size())
_thread.start_new_thread(_recording_thread, (stream,))
print("AudioFlinger: Recording thread started successfully")
return True
except Exception as e:
import sys
print(f"AudioFlinger: record_wav() failed: {e}")
sys.print_exception(e)
return False
def stop():
"""Stop current audio playback."""
global _current_stream
"""Stop current audio playback or recording."""
global _current_stream, _current_recording
stopped = False
if _current_stream:
_current_stream.stop()
print("AudioFlinger: Playback stopped")
else:
print("AudioFlinger: No playback to stop")
stopped = True
if _current_recording:
_current_recording.stop()
print("AudioFlinger: Recording stopped")
stopped = True
if not stopped:
print("AudioFlinger: No playback or recording to stop")
def pause():
@@ -259,3 +358,13 @@ def is_playing():
bool: True if playback active, False otherwise
"""
return _current_stream is not None and _current_stream.is_playing()
def is_recording():
"""
Check if audio is currently being recorded.
Returns:
bool: True if recording active, False otherwise
"""
return _current_recording is not None and _current_recording.is_recording()
@@ -0,0 +1,319 @@
# RecordStream - WAV File Recording Stream for AudioFlinger
# Records 16-bit mono PCM audio from I2S microphone to WAV file
# Uses synchronous recording in a separate thread for non-blocking operation
# On desktop (no I2S hardware), generates a 440Hz sine wave for testing
import math
import os
import sys
import time
# Try to import machine module (not available on desktop)
try:
import machine
_HAS_MACHINE = True
except ImportError:
_HAS_MACHINE = False
def _makedirs(path):
"""
Create directory and all parent directories (like os.makedirs).
MicroPython doesn't have os.makedirs, so we implement it manually.
"""
if not path:
return
parts = path.split('/')
current = ''
for part in parts:
if not part:
continue
current = current + '/' + part if current else part
try:
os.mkdir(current)
except OSError:
pass # Directory may already exist
class RecordStream:
"""
WAV file recording stream with I2S input.
Records 16-bit mono PCM audio from I2S microphone.
"""
# Default recording parameters
DEFAULT_SAMPLE_RATE = 16000 # 16kHz - good for voice
DEFAULT_MAX_DURATION_MS = 60000 # 60 seconds max
def __init__(self, file_path, duration_ms, sample_rate, i2s_pins, on_complete):
"""
Initialize recording stream.
Args:
file_path: Path to save WAV file
duration_ms: Recording duration in milliseconds (None = until stop())
sample_rate: Sample rate in Hz
i2s_pins: Dict with 'sck', 'ws', 'sd_in' pin numbers
on_complete: Callback function(message) when recording finishes
"""
self.file_path = file_path
self.duration_ms = duration_ms if duration_ms else self.DEFAULT_MAX_DURATION_MS
self.sample_rate = sample_rate if sample_rate else self.DEFAULT_SAMPLE_RATE
self.i2s_pins = i2s_pins
self.on_complete = on_complete
self._keep_running = True
self._is_recording = False
self._i2s = None
self._bytes_recorded = 0
def is_recording(self):
"""Check if stream is currently recording."""
return self._is_recording
def stop(self):
"""Stop recording."""
self._keep_running = False
def get_elapsed_ms(self):
"""Get elapsed recording time in milliseconds."""
# Calculate from bytes recorded: bytes / (sample_rate * 2 bytes per sample) * 1000
if self.sample_rate > 0:
return int((self._bytes_recorded / (self.sample_rate * 2)) * 1000)
return 0
# ----------------------------------------------------------------------
# WAV header generation
# ----------------------------------------------------------------------
@staticmethod
def _create_wav_header(sample_rate, num_channels, bits_per_sample, data_size):
"""
Create WAV file header.
Args:
sample_rate: Sample rate in Hz
num_channels: Number of channels (1 for mono)
bits_per_sample: Bits per sample (16)
data_size: Size of audio data in bytes
Returns:
bytes: 44-byte WAV header
"""
byte_rate = sample_rate * num_channels * (bits_per_sample // 8)
block_align = num_channels * (bits_per_sample // 8)
file_size = data_size + 36 # Total file size minus 8 bytes for RIFF header
header = bytearray(44)
# RIFF header
header[0:4] = b'RIFF'
header[4:8] = file_size.to_bytes(4, 'little')
header[8:12] = b'WAVE'
# fmt chunk
header[12:16] = b'fmt '
header[16:20] = (16).to_bytes(4, 'little') # fmt chunk size
header[20:22] = (1).to_bytes(2, 'little') # PCM format
header[22:24] = num_channels.to_bytes(2, 'little')
header[24:28] = sample_rate.to_bytes(4, 'little')
header[28:32] = byte_rate.to_bytes(4, 'little')
header[32:34] = block_align.to_bytes(2, 'little')
header[34:36] = bits_per_sample.to_bytes(2, 'little')
# data chunk
header[36:40] = b'data'
header[40:44] = data_size.to_bytes(4, 'little')
return bytes(header)
@staticmethod
def _update_wav_header(f, data_size):
"""
Update WAV header with final data size.
Args:
f: File object (must be opened in r+b mode)
data_size: Final size of audio data in bytes
"""
file_size = data_size + 36
# Update file size at offset 4
f.seek(4)
f.write(file_size.to_bytes(4, 'little'))
# Update data size at offset 40
f.seek(40)
f.write(data_size.to_bytes(4, 'little'))
# ----------------------------------------------------------------------
# Desktop simulation - generate 440Hz sine wave
# ----------------------------------------------------------------------
def _generate_sine_wave_chunk(self, chunk_size, sample_offset):
"""
Generate a chunk of 440Hz sine wave samples for desktop testing.
Args:
chunk_size: Number of bytes to generate (must be even for 16-bit samples)
sample_offset: Current sample offset for phase continuity
Returns:
tuple: (bytearray of samples, number of samples generated)
"""
frequency = 440 # A4 note
amplitude = 16000 # ~50% of max 16-bit amplitude
num_samples = chunk_size // 2
buf = bytearray(chunk_size)
for i in range(num_samples):
# Calculate sine wave sample
t = (sample_offset + i) / self.sample_rate
sample = int(amplitude * math.sin(2 * math.pi * frequency * t))
# Clamp to 16-bit range
if sample > 32767:
sample = 32767
elif sample < -32768:
sample = -32768
# Write as little-endian 16-bit
buf[i * 2] = sample & 0xFF
buf[i * 2 + 1] = (sample >> 8) & 0xFF
return buf, num_samples
# ----------------------------------------------------------------------
# Main recording routine
# ----------------------------------------------------------------------
def record(self):
"""Main synchronous recording routine (runs in separate thread)."""
print(f"RecordStream.record() called")
print(f" file_path: {self.file_path}")
print(f" duration_ms: {self.duration_ms}")
print(f" sample_rate: {self.sample_rate}")
print(f" i2s_pins: {self.i2s_pins}")
print(f" _HAS_MACHINE: {_HAS_MACHINE}")
self._is_recording = True
self._bytes_recorded = 0
try:
# Ensure directory exists
dir_path = '/'.join(self.file_path.split('/')[:-1])
print(f"RecordStream: Creating directory: {dir_path}")
if dir_path:
_makedirs(dir_path)
print(f"RecordStream: Directory created/verified")
# Create file with placeholder header
print(f"RecordStream: Creating WAV file with header")
with open(self.file_path, 'wb') as f:
# Write placeholder header (will be updated at end)
header = self._create_wav_header(
self.sample_rate,
num_channels=1,
bits_per_sample=16,
data_size=0
)
f.write(header)
print(f"RecordStream: Header written ({len(header)} bytes)")
print(f"RecordStream: Recording to {self.file_path}")
print(f"RecordStream: {self.sample_rate} Hz, 16-bit, mono")
print(f"RecordStream: Max duration {self.duration_ms}ms")
# Check if we have real I2S hardware or need to simulate
use_simulation = not _HAS_MACHINE
if not use_simulation:
# Initialize I2S in RX mode with correct pins for microphone
try:
# Use sck_in if available (separate clock for mic), otherwise fall back to sck
sck_pin = self.i2s_pins.get('sck_in', self.i2s_pins.get('sck'))
print(f"RecordStream: Initializing I2S RX with sck={sck_pin}, ws={self.i2s_pins['ws']}, sd={self.i2s_pins['sd_in']}")
self._i2s = machine.I2S(
0,
sck=machine.Pin(sck_pin, machine.Pin.OUT),
ws=machine.Pin(self.i2s_pins['ws'], machine.Pin.OUT),
sd=machine.Pin(self.i2s_pins['sd_in'], machine.Pin.IN),
mode=machine.I2S.RX,
bits=16,
format=machine.I2S.MONO,
rate=self.sample_rate,
ibuf=8000 # 8KB input buffer
)
print(f"RecordStream: I2S initialized successfully")
except Exception as e:
print(f"RecordStream: I2S init failed: {e}")
print(f"RecordStream: Falling back to simulation mode")
use_simulation = True
if use_simulation:
print(f"RecordStream: Using desktop simulation (440Hz sine wave)")
# Calculate recording parameters
chunk_size = 1024 # Read 1KB at a time
max_bytes = int((self.duration_ms / 1000) * self.sample_rate * 2)
start_time = time.ticks_ms()
sample_offset = 0 # For sine wave phase continuity
print(f"RecordStream: max_bytes={max_bytes}, chunk_size={chunk_size}")
# Open file for appending audio data
with open(self.file_path, 'r+b') as f:
f.seek(44) # Skip header
buf = bytearray(chunk_size)
while self._keep_running and self._bytes_recorded < max_bytes:
# Check elapsed time
elapsed = time.ticks_diff(time.ticks_ms(), start_time)
if elapsed >= self.duration_ms:
print(f"RecordStream: Duration limit reached ({elapsed}ms)")
break
if use_simulation:
# Generate sine wave samples for desktop testing
buf, num_samples = self._generate_sine_wave_chunk(chunk_size, sample_offset)
sample_offset += num_samples
num_read = chunk_size
# Simulate real-time recording speed
time.sleep_ms(int((chunk_size / 2) / self.sample_rate * 1000))
else:
# Read from I2S
try:
num_read = self._i2s.readinto(buf)
except Exception as e:
print(f"RecordStream: Read error: {e}")
break
if num_read > 0:
f.write(buf[:num_read])
self._bytes_recorded += num_read
# Update header with actual data size
print(f"RecordStream: Updating WAV header with data_size={self._bytes_recorded}")
self._update_wav_header(f, self._bytes_recorded)
elapsed_ms = time.ticks_diff(time.ticks_ms(), start_time)
print(f"RecordStream: Finished recording {self._bytes_recorded} bytes ({elapsed_ms}ms)")
if self.on_complete:
self.on_complete(f"Recorded: {self.file_path}")
except Exception as e:
import sys
print(f"RecordStream: Error: {e}")
sys.print_exception(e)
if self.on_complete:
self.on_complete(f"Error: {e}")
finally:
self._is_recording = False
if self._i2s:
self._i2s.deinit()
self._i2s = None
print(f"RecordStream: Recording thread finished")
@@ -296,12 +296,18 @@ import mpos.audio.audioflinger as AudioFlinger
# Initialize buzzer (GPIO 46)
buzzer = PWM(Pin(46), freq=550, duty=0)
# I2S pin configuration (GPIO 2, 47, 16)
# I2S pin configuration for audio output (DAC) and input (microphone)
# Note: I2S is created per-stream, not at boot (only one instance can exist)
# The DAC uses BCK (bit clock) on GPIO 2, while the microphone uses SCLK on GPIO 17
# See schematics: DAC has BCK=2, WS=47, SD=16; Microphone has SCLK=17, WS=47, DIN=15
i2s_pins = {
'sck': 2,
'ws': 47,
'sd': 16,
# Output (DAC/speaker) pins
'sck': 2, # BCK - Bit Clock for DAC output
'ws': 47, # Word Select / LRCLK (shared between DAC and mic)
'sd': 16, # Serial Data OUT (speaker/DAC)
# Input (microphone) pins
'sck_in': 17, # SCLK - Serial Clock for microphone input
'sd_in': 15, # DIN - Serial Data IN (microphone)
}
# Initialize AudioFlinger with I2S and buzzer
+11 -3
View File
@@ -98,9 +98,17 @@ mpos.battery_voltage.init_adc(999, adc_to_voltage)
# === AUDIO HARDWARE ===
import mpos.audio.audioflinger as AudioFlinger
# Note: Desktop builds have no audio hardware
# AudioFlinger functions will return False (no-op)
AudioFlinger.init()
# Desktop builds have no real audio hardware, but we simulate microphone
# recording with a 440Hz sine wave for testing WAV file generation
# The i2s_pins dict with 'sd_in' enables has_microphone() to return True
i2s_pins = {
'sck': 0, # Simulated - not used on desktop
'ws': 0, # Simulated - not used on desktop
'sd': 0, # Simulated - not used on desktop
'sck_in': 0, # Simulated - not used on desktop
'sd_in': 0, # Simulated - enables microphone simulation
}
AudioFlinger.init(i2s_pins=i2s_pins)
# === LED HARDWARE ===
# Note: Desktop builds have no LED hardware
+62
View File
@@ -142,3 +142,65 @@ class TestAudioFlinger(unittest.TestCase):
# After init, volume should be at default (70)
AudioFlinger.init(i2s_pins=None, buzzer_instance=None)
self.assertEqual(AudioFlinger.get_volume(), 70)
class TestAudioFlingerRecording(unittest.TestCase):
"""Test cases for AudioFlinger recording functionality."""
def setUp(self):
"""Initialize AudioFlinger with microphone before each test."""
self.buzzer = MockPWM(MockPin(46))
# I2S pins with microphone input
self.i2s_pins_with_mic = {'sck': 2, 'ws': 47, 'sd': 16, 'sd_in': 15}
# I2S pins without microphone input
self.i2s_pins_no_mic = {'sck': 2, 'ws': 47, 'sd': 16}
# Reset state
AudioFlinger._current_recording = None
AudioFlinger.set_volume(70)
AudioFlinger.init(
i2s_pins=self.i2s_pins_with_mic,
buzzer_instance=self.buzzer
)
def tearDown(self):
"""Clean up after each test."""
AudioFlinger.stop()
def test_has_microphone_with_sd_in(self):
"""Test has_microphone() returns True when sd_in pin is configured."""
AudioFlinger.init(i2s_pins=self.i2s_pins_with_mic, buzzer_instance=None)
self.assertTrue(AudioFlinger.has_microphone())
def test_has_microphone_without_sd_in(self):
"""Test has_microphone() returns False when sd_in pin is not configured."""
AudioFlinger.init(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
self.assertFalse(AudioFlinger.has_microphone())
def test_has_microphone_no_i2s(self):
"""Test has_microphone() returns False when no I2S is configured."""
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertFalse(AudioFlinger.has_microphone())
def test_is_recording_initially_false(self):
"""Test that is_recording() returns False initially."""
self.assertFalse(AudioFlinger.is_recording())
def test_record_wav_no_microphone(self):
"""Test that record_wav() fails when no microphone is configured."""
AudioFlinger.init(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
result = AudioFlinger.record_wav("test.wav")
self.assertFalse(result)
def test_record_wav_no_i2s(self):
"""Test that record_wav() fails when no I2S is configured."""
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
result = AudioFlinger.record_wav("test.wav")
self.assertFalse(result)
def test_stop_with_no_recording(self):
"""Test that stop() can be called when nothing is recording."""
# Should not raise exception
AudioFlinger.stop()
self.assertFalse(AudioFlinger.is_recording())