MusicPlayer app: Move AudioPlayer to its own class

This commit is contained in:
Thomas Farstrike
2025-10-28 11:47:56 +01:00
parent f8ad2a0c23
commit b056c77929
2 changed files with 139 additions and 99 deletions
@@ -0,0 +1,136 @@
import os
import time
# ----------------------------------------------------------------------
# AudioPlayer robust, volume-controllable WAV player
# ----------------------------------------------------------------------
class AudioPlayer:
# class-level defaults (shared by every instance)
_i2s = None # the I2S object (created once per playback)
_volume = 100 # 0-100 (100 = full scale)
@staticmethod
def find_data_chunk(f):
"""Skip chunks until 'data' is found → (data_start, data_size, sample_rate)"""
f.seek(0)
if f.read(4) != b'RIFF':
raise ValueError("Not a RIFF file")
file_size = int.from_bytes(f.read(4), 'little') + 8
if f.read(4) != b'WAVE':
raise ValueError("Not a WAVE file")
pos = 12
sample_rate = None
while pos < file_size:
f.seek(pos)
chunk_id = f.read(4)
if len(chunk_id) < 4:
break
chunk_size = int.from_bytes(f.read(4), 'little')
if chunk_id == b'fmt ':
fmt = f.read(chunk_size)
if len(fmt) < 16:
raise ValueError("Invalid fmt chunk")
if int.from_bytes(fmt[0:2], 'little') != 1:
raise ValueError("Only PCM supported")
channels = int.from_bytes(fmt[2:4], 'little')
if channels != 1:
raise ValueError("Only mono supported")
sample_rate = int.from_bytes(fmt[4:8], 'little')
if int.from_bytes(fmt[14:16], 'little') != 16:
raise ValueError("Only 16-bit supported")
elif chunk_id == b'data':
return f.tell(), chunk_size, sample_rate
# next chunk (pad byte if odd length)
pos += 8 + chunk_size
if chunk_size % 2:
pos += 1
raise ValueError("No 'data' chunk found")
# ------------------------------------------------------------------
# Volume control
# ------------------------------------------------------------------
@classmethod
def set_volume(cls, volume: int):
"""Set playback volume 0-100 (100 = full scale)."""
volume = max(0, min(100, volume)) # clamp
cls._volume = volume
# If playback is already running we could instantly re-scale the
# current buffer, but the simple way (scale on each write) is
# enough and works even if playback starts later.
@classmethod
def get_volume(cls) -> int:
"""Return current volume 0-100."""
return cls._volume
# ------------------------------------------------------------------
# Playback entry point (called from a thread)
# ------------------------------------------------------------------
@classmethod
def play_wav(cls, filename):
"""Play a large mono 16-bit PCM WAV file with on-the-fly volume."""
try:
with open(filename, 'rb') as f:
st = os.stat(filename)
file_size = st[6]
print(f"File size: {file_size} bytes")
data_start, data_size, sample_rate = cls.find_data_chunk(f)
print(f"data chunk: {data_size} bytes @ {sample_rate} Hz")
if data_size > file_size - data_start:
data_size = file_size - data_start
# ---- I2S init ------------------------------------------------
try:
cls._i2s = machine.I2S(
0,
sck=machine.Pin(2, machine.Pin.OUT),
ws =machine.Pin(47, machine.Pin.OUT),
sd =machine.Pin(16, machine.Pin.OUT),
mode=machine.I2S.TX,
bits=16,
format=machine.I2S.MONO,
rate=sample_rate,
ibuf=32000
)
except Exception as e:
print("Warning: error initializing I2S audio device, simulating playback...")
print(f"Playing {data_size} bytes (vol {cls._volume}%) …")
f.seek(data_start)
chunk_size = 4096 # 4 KB → safe on ESP32
scale = cls._volume / 100.0 # float 0.0-1.0
total = 0
while total < data_size:
to_read = min(chunk_size, data_size - total)
raw = f.read(to_read)
if not raw:
break
# ---- on-the-fly volume scaling (16-bit little-endian) ----
if scale < 1.0:
# convert bytes → array of signed ints → scale → back to bytes
import array
samples = array.array('h', raw) # 'h' = signed short
for i in range(len(samples)):
samples[i] = int(samples[i] * scale)
raw = samples.tobytes()
# ---------------------------------------------------------
if cls._i2s:
cls._i2s.write(raw)
else:
time.sleep((to_read/2)/44100) # 16 bits (2 bytes) per sample at 44100 samples/s
total += len(raw)
print("Playback finished.")
except Exception as e:
print(f"AudioPlayer error: {e}")
finally:
if cls._i2s:
cls._i2s.deinit()
cls._i2s = None
@@ -1,11 +1,13 @@
import machine
import uos
import os
import _thread
from mpos.apps import Activity, Intent
import mpos.sdcard
import mpos.ui
from audio_player import AudioPlayer
class MusicPlayer(Activity):
# Widgets:
@@ -42,104 +44,6 @@ class MusicPlayer(Activity):
else:
print("INFO: ignoring unsupported file format")
class AudioPlayer:
def find_data_chunk(f):
"""Skip chunks until 'data' is found. Returns (data_start_pos, data_size)."""
# Go back to start
f.seek(0)
riff = f.read(4)
if riff != b'RIFF':
raise ValueError("Not a RIFF file")
file_size = int.from_bytes(f.read(4), 'little') + 8 # Total file size
wave = f.read(4)
if wave != b'WAVE':
raise ValueError("Not a WAVE file")
pos = 12 # Start after RIFF header
while pos < file_size:
f.seek(pos)
chunk_id = f.read(4)
if len(chunk_id) < 4:
break
chunk_size = int.from_bytes(f.read(4), 'little')
if chunk_id == b'fmt ':
fmt_data = f.read(chunk_size)
if len(fmt_data) < 16:
raise ValueError("Invalid fmt chunk")
audio_format = int.from_bytes(fmt_data[0:2], 'little')
channels = int.from_bytes(fmt_data[2:4], 'little')
sample_rate = int.from_bytes(fmt_data[4:8], 'little')
bits_per_sample = int.from_bytes(fmt_data[14:16], 'little')
if audio_format != 1:
raise ValueError("Only PCM supported")
if bits_per_sample != 16:
raise ValueError("Only 16-bit supported")
if channels != 1:
raise ValueError("Only mono supported")
elif chunk_id == b'data':
data_start = f.tell()
data_size = chunk_size
return data_start, data_size, sample_rate
# Skip chunk (pad byte if odd size)
pos += 8 + chunk_size
if chunk_size % 2 == 1:
pos += 1
raise ValueError("No 'data' chunk found")
def play_wav(filename):
"""Play large WAV files robustly with chunk skipping and streaming."""
try:
with open(filename, 'rb') as f:
stat = uos.stat(filename)
file_size = stat[6]
print(f"File size: {file_size} bytes")
data_start, data_size, sample_rate = AudioPlayer.find_data_chunk(f)
print(f"Found 'data' chunk: {data_size} bytes at {sample_rate} Hz")
if data_size > file_size - data_start:
print("Warning: data_size exceeds file bounds. Truncating.")
data_size = file_size - data_start
# Configure I2S
i2s = machine.I2S(
0,
sck=machine.Pin(2, machine.Pin.OUT),
ws=machine.Pin(47, machine.Pin.OUT),
sd=machine.Pin(16, machine.Pin.OUT),
mode=machine.I2S.TX,
bits=16,
format=machine.I2S.MONO,
rate=sample_rate,
ibuf=32000 # Larger buffer for stability
)
print(f"Playing {data_size} bytes at {sample_rate} Hz...")
f.seek(data_start)
chunk_size = 4096 # 4KB chunks = safe for ESP32
total_read = 0
while total_read < data_size:
remaining = data_size - total_read
read_size = min(chunk_size, remaining)
chunk = f.read(read_size)
if not chunk:
break
i2s.write(chunk)
total_read += len(chunk)
print("Playback finished.")
except Exception as e:
print(f"Error: {e}")
finally:
try:
i2s.deinit()
except:
pass
class FullscreenPlayer(Activity):
# No __init__() so super.__init__() will be called automatically