You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
124 lines
3.8 KiB
Python
124 lines
3.8 KiB
Python
"""Minimal duplex I2S test for Fri3d 2024 with communicator.
|
|
|
|
Creates TX + RX I2S instances simultaneously using merged pin config
|
|
from the fri3d_2024 board setup. Intended for quick validation only.
|
|
|
|
To get this working, the I2S needs to be changed, see plan at https://github.com/orgs/micropython/discussions/12473
|
|
"""
|
|
|
|
import time
|
|
|
|
try:
|
|
import machine
|
|
_HAS_MACHINE = True
|
|
except ImportError:
|
|
_HAS_MACHINE = False
|
|
|
|
|
|
# Merged pin map from internal_filesystem/lib/mpos/board/fri3d_2024.py
|
|
I2S_PINS = {
|
|
"ws": 47, # shared LRCLK
|
|
"sck": 2, # DAC bit clock
|
|
"sd": 16, # DAC data out
|
|
"sck_in": 17, # mic bit clock
|
|
"sd_in": 15, # mic data in
|
|
}
|
|
|
|
|
|
class DuplexI2STest:
|
|
"""Minimal duplex setup: one TX I2S + one RX I2S running together."""
|
|
|
|
def __init__(self, sample_rate=16000, duration_ms=3000):
|
|
self.sample_rate = sample_rate
|
|
self.duration_ms = duration_ms
|
|
self._tx = None
|
|
self._rx = None
|
|
|
|
def _init_write(self):
|
|
self._tx = machine.I2S(
|
|
0,
|
|
sck=machine.Pin(I2S_PINS["sck"], machine.Pin.OUT),
|
|
ws=machine.Pin(I2S_PINS["ws"], machine.Pin.OUT),
|
|
sd=machine.Pin(I2S_PINS["sd"], machine.Pin.OUT),
|
|
mode=machine.I2S.TX,
|
|
bits=16,
|
|
format=machine.I2S.MONO,
|
|
rate=self.sample_rate,
|
|
ibuf=16000,
|
|
)
|
|
|
|
def _init_read(self):
|
|
self._rx = machine.I2S(
|
|
1,
|
|
sck=machine.Pin(I2S_PINS["sck_in"], machine.Pin.OUT),
|
|
ws=machine.Pin(I2S_PINS["ws"], machine.Pin.OUT),
|
|
sd=machine.Pin(I2S_PINS["sd_in"], machine.Pin.IN),
|
|
mode=machine.I2S.RX,
|
|
bits=16,
|
|
format=machine.I2S.MONO,
|
|
rate=self.sample_rate,
|
|
ibuf=16000,
|
|
)
|
|
|
|
def _init_i2s(self):
|
|
if not _HAS_MACHINE:
|
|
raise RuntimeError("machine.I2S not available")
|
|
|
|
self._init_read()
|
|
self._init_write()
|
|
|
|
def _deinit_i2s(self):
|
|
if self._tx:
|
|
self._tx.deinit()
|
|
self._tx = None
|
|
if self._rx:
|
|
self._rx.deinit()
|
|
self._rx = None
|
|
|
|
def run(self):
|
|
"""Run a short duplex session: play a tone while reading mic data."""
|
|
self._init_i2s()
|
|
try:
|
|
tone = self._make_tone_buffer(freq_hz=440, ms=50)
|
|
read_buf = bytearray(1024)
|
|
recorded = bytearray()
|
|
t_end = time.ticks_add(time.ticks_ms(), self.duration_ms)
|
|
|
|
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
|
|
#self._tx.write(tone) # works but saturates the microphone
|
|
read_len = self._rx.readinto(read_buf)
|
|
if read_len:
|
|
recorded.extend(read_buf[:read_len])
|
|
|
|
print("waiting a bit")
|
|
time.sleep(1)
|
|
if recorded:
|
|
print("playing the recording")
|
|
playback = memoryview(recorded)
|
|
offset = 0
|
|
while offset < len(playback):
|
|
if not self._tx:
|
|
self._init_write()
|
|
offset += self._tx.write(playback[offset:])
|
|
finally:
|
|
self._deinit_i2s()
|
|
|
|
def _make_tone_buffer(self, freq_hz=440, ms=50):
|
|
samples = int(self.sample_rate * (ms / 1000))
|
|
buf = bytearray(samples * 2)
|
|
for i in range(samples):
|
|
phase = 2 * 3.14159265 * freq_hz * (i / self.sample_rate)
|
|
sample = int(12000 * __import__("math").sin(phase))
|
|
buf[i * 2] = sample & 0xFF
|
|
buf[i * 2 + 1] = (sample >> 8) & 0xFF
|
|
return buf
|
|
|
|
|
|
def run_duplex_test(sample_rate=16000, duration_ms=3000):
|
|
"""Convenience entry point for quick manual tests."""
|
|
DuplexI2STest(sample_rate=sample_rate, duration_ms=duration_ms).run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_duplex_test()
|